2
votes

Environment:Hadoop2.75.+FLink1.4+Kafka0.10

I have set up a real time data process project. I use Flink Table source API (Kafka010JsonTableSource) as a tablaSource. Get data from kafka then execute a SQL and output to a kafka topic at last.It's a clear process flow, but I met exceptions when I executed it on Flink cluster, below is my main code:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000) 
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
    .forTopic(kafkaConfig.topic)
    .withKafkaProperties(props)
    .withSchema(dynamicJsonSchema)
    .withRowtimeAttribute(
         enventTimeFieldName,  
         new ExistingField(enventTimeFieldName),  
         new MyBoundedOutOfOrderTimestamps(100L)) 
    .build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))

I already enable checkpoint. The first time i execute on flink, i just follow the default configurations of consumer. After the Flink task is running, I checked the offsets by kafka shell commands(kafka-consumer-groups.sh) and found a strange situation. Base on the shell commands output and logs from Flink task manager, I found the offsets were committed successfully at the beginning of the several seconds, but later I continue to met many exceptions like below:

Blockquote 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247) 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247)

so I searched solution base on above error information.And somebody tell me that should increase the session.timeout.ms , then i follow it, but still failed. After that I try to many kinds of below combo configurations to test, kafka offsets always were committed successfully at the beginning, but will committed failed later. I really do not know to solve it, would you help me to fix it?Thanks very much!!!!!!

The kafka consumer configurations combo as below: { "propertyKey": "session.timeout.ms", "propertyValue": "300000" }, { "propertyKey": "request.timeout.ms", "propertyValue": "505000" }, { "propertyKey": "auto.commit.interval.ms", "propertyValue": "10000" }, { "propertyKey": "max.poll.records", "propertyValue": "50" }, { "propertyKey": "max.poll.interval.ms", "propertyValue": "500000" }, { "propertyKey": "client.id", "propertyValue": "taxi-client-001" }, { "propertyKey": "heartbeat.interval.ms", "propertyValue": "99000" } I tried to change above configurations to all kinds values, but all failed , even I configure them base on the kafka official document guide. I hope you can help to fix above error , thanks so much!!!

1
Did you try to increase max.poll.interval.ms to some value slightly above the average processing time for each record?amethystic
Yes, in my configuration, you can see the value of max.poll.interval.ms is 500000, and its default value is 305000, I think 500000 is enough. Do you think this property should be larger ?Gene
Another possible situation is that you have another running consumer group with the same name. Did you check the group id you specified is unique?amethystic

1 Answers

3
votes

I got the root cause. The reason why the rebalance error always happened is the two consumers' (one is consumer input data , another one is consumer output data) group names are the same. I suspect that just one coordinator have not enough capability to handle two consumer's offset commit actions.After I change one consumer's group name, the world was suddenly quiet. The error never happened.