Environment:Hadoop2.75.+FLink1.4+Kafka0.10
I have set up a real time data process project. I use Flink Table source API (Kafka010JsonTableSource) as a tablaSource. Get data from kafka then execute a SQL and output to a kafka topic at last.It's a clear process flow, but I met exceptions when I executed it on Flink cluster, below is my main code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
env.enableCheckpointing(5000)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(
enventTimeFieldName,
new ExistingField(enventTimeFieldName),
new MyBoundedOutOfOrderTimestamps(100L))
.build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
tableResult.writeToSink(new Kafka010JsonTableSink(kafkaOutput.topic, props))
I already enable checkpoint. The first time i execute on flink, i just follow the default configurations of consumer. After the Flink task is running, I checked the offsets by kafka shell commands(kafka-consumer-groups.sh) and found a strange situation. Base on the shell commands output and logs from Flink task manager, I found the offsets were committed successfully at the beginning of the several seconds, but later I continue to met many exceptions like below:
Blockquote 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247) 2018-01-19 09:24:03,174 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Async Kafka commit failed. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1096) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:247)
so I searched solution base on above error information.And somebody tell me that should increase the session.timeout.ms , then i follow it, but still failed. After that I try to many kinds of below combo configurations to test, kafka offsets always were committed successfully at the beginning, but will committed failed later. I really do not know to solve it, would you help me to fix it?Thanks very much!!!!!!
The kafka consumer configurations combo as below:
{
"propertyKey": "session.timeout.ms",
"propertyValue": "300000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "505000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "10000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "50"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "500000"
},
{
"propertyKey": "client.id",
"propertyValue": "taxi-client-001"
},
{
"propertyKey": "heartbeat.interval.ms",
"propertyValue": "99000"
}
I tried to change above configurations to all kinds values, but all failed , even I configure them base on the kafka official document guide. I hope you can help to fix above error , thanks so much!!!
max.poll.interval.ms
to some value slightly above the average processing time for each record? – amethysticmax.poll.interval.ms
is 500000, and its default value is 305000, I think 500000 is enough. Do you think this property should be larger ? – Gene