I have set up a local Kafka0.10+Flink1.4 environment.
I use the code below to consumer data from a Kafka topic:
val tableSource:KafkaTableSource = Kafka010JsonTableSource.builder()
.forTopic(kafkaConfig.topic)
.withKafkaProperties(props)
.withSchema(dynamicJsonSchema)
.withRowtimeAttribute(enventTimeFieldName,new ExistingField(enventTimeFieldName), new BoundedOutOfOrderTimestamps(30000L))
.build()
tableEnv.registerTableSource(tableName, tableSource)
val tableResult:Table = tableEnv.sqlQuery(sql)
After I execute this code, there is always warning message found:
Auto-commit of offsets {taxiData-0=OffsetAndMetadata{offset=728461, metadata=''}} failed for group taxiDataGroup: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Whatever properties like below I set in the Kafka, it always shows the warning message above.
{
"propertyKey": "enable.auto.commit",
"propertyValue": "true"
},
{
"propertyKey": "session.timeout.ms",
"propertyValue": "250000"
},
{
"propertyKey": "request.timeout.ms",
"propertyValue": "305000"
},
{
"propertyKey": "auto.commit.interval.ms",
"propertyValue": "800000"
},
{
"propertyKey": "max.poll.records",
"propertyValue": "300"
},
{
"propertyKey": "max.poll.interval.ms",
"propertyValue": "300000"
}
I am not sure the if Kafka010JsonTableSource Flink1.4 will auto-commit offset. But the test result indicates that it will not auto-commit offset. Can anyone help to confirm this question? Or can you see any other issues in my code?
if checkpointing is disabled, the Flink Kafka Consumer relies on the automatic periodic offset committing capability of the internally used Kafka clients
so i think the internal kafka clients should commit the offset automatically, but in fact, i always met commit failed warning message event i tried many combo of the properties setting. – Gene