kafka server and client jars moved to latest library : 0.10.0.1
And my consumer and producer code using latest kafka jars as mentioned above but still using old consumer apis(0.8.2).
I am facing problem on consumer side while calling commit offset.
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka server side configuration :
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
below configurations for kafka consumer :
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
To create consumer I'm using below api:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
and for commit call
consumer.commitOffsets();
while reading message from kafka, we are using below method to to handle timeout
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
This is required, as we want to start processing only after specific time interval or size of messages(bytes) received from kafka.
same Exception, even after setting dual.commit.enabled = false consumer.timeout.ms = 1000 where other settings keeping as old configuration.
some more details:
With version 0.8.2.1, I never face such problem. After moving to 0.10.0.1(client as well as server), start getting this exception.
We are reading multiple messages before processing/pushing to hadoop. Processing/Writing to hadoop part takes time(~5 minutes). And after this process when we try to push we are getting above exception. this exception I'm getting on every 2nd commitOffset. And some time(where commitOffset calling withing 10 seconds of previous commit) no exception for 2nd commit.
for your information. if commit offset failed then consumer just reading next messages without going back to last successful commit offset position. but if commit offset failed and restarting consumer process then it is reading from old commit position.