1
votes

kafka server and client jars moved to latest library : 0.10.0.1

And my consumer and producer code using latest kafka jars as mentioned above but still using old consumer apis(0.8.2).

I am facing problem on consumer side while calling commit offset.

2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
    at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
    at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)

kafka server side configuration :

listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false

below configurations for kafka consumer :

auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000

To create consumer I'm using below api:

ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));

and for commit call

consumer.commitOffsets();

while reading message from kafka, we are using below method to to handle timeout

private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
    try
    {
        it.hasNext();
        return true;
    }
    catch (ConsumerTimeoutException e)
    {
        return false;
    }
}

This is required, as we want to start processing only after specific time interval or size of messages(bytes) received from kafka.

same Exception, even after setting dual.commit.enabled = false consumer.timeout.ms = 1000 where other settings keeping as old configuration.

some more details:

With version 0.8.2.1, I never face such problem. After moving to 0.10.0.1(client as well as server), start getting this exception.

We are reading multiple messages before processing/pushing to hadoop. Processing/Writing to hadoop part takes time(~5 minutes). And after this process when we try to push we are getting above exception. this exception I'm getting on every 2nd commitOffset. And some time(where commitOffset calling withing 10 seconds of previous commit) no exception for 2nd commit.

for your information. if commit offset failed then consumer just reading next messages without going back to last successful commit offset position. but if commit offset failed and restarting consumer process then it is reading from old commit position.

1

1 Answers

0
votes

As I mentioned in problem details, I'm using latest kafka jars, but yet using old consumer client:

kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));

I solved this issue by calling 2nd time commitOffset.

actually the problem related to connections.max.idle.ms. this property is introduced with latest kafka(broker=10 minutes, consumer=9 minutes, producer=9 minutes).

due to this whenever my old consumer calling 2nd commit offset after 10 minutes, I am getting above exception.

with old consumer API, there is no way to set this property. and broker configuration I can't change(as handle by other team and serving same brokers for other users)...

here i think old commitOffset call required another connection(other than iterator) and that connection is getting close when its ideal for more than 10 minutes. I'm not very sure about this.

if any failure happen on 1st commitOffset call then 2nd call will make sure to get success. and if 1st one itself getting success then next one execution will not make any problem. any way, we have very few calls for commit offset.

next I'll move my code with latest kafka consumer and producer java APIs.