It behaves as expected for me:
@SpringBootApplication
public class So67520619Application {
public static void main(String[] args) {
SpringApplication.run(So67520619Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so67520619", "foo");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67520619").partitions(1).replicas(1).build();
}
}
@Component
class Listener implements ConsumerAwareRebalanceListener {
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "so67520619", topics = "so67520619")
public void listen(List<String> in) throws InterruptedException {
LOG.info(in.toString());
Thread.sleep(12000);
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
LOG.info("Assigned: " + partitions);
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
LOG.info("Revoked: " + partitions);
}
}
spring.kafka.consumer.properties.max.poll.interval.ms=10000
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
2021-05-13 11:13:16.339 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 11:13:16.358 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo]
2021-05-13 11:13:26.416 INFO 20954 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-c9a440bf-9076-4575-813d-3efb0054f5f7 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 11:13:28.365 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 11:13:28.370 ERROR 20954 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1431) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) ~[spring-kafka-2.6.7.jar:2.6.7]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504) ~[kafka-clients-2.6.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2396) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2391) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2191) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1149) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.7.jar:2.6.7]
... 3 common frames omitted
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 11:13:28.371 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Revoked: [so67520619-0]
2021-05-13 11:13:28.372 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.376 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 11:13:28.376 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 11:13:28.485 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 3: {consumer-so67520619-1-15ce3150-1aa3-4b43-a892-fbd54e8ed919=Assignment(partitions=[so67520619-0])}
2021-05-13 11:13:28.486 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 3
2021-05-13 11:13:28.487 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 11:13:28.487 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 11:13:28.488 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 11:13:28.489 INFO 20954 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 11:13:28.490 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 11:13:28.494 INFO 20954 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo]
EDIT
And with 60/65 seconds, it still works for me...
2021-05-13 17:24:28.111 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 17:24:28.130 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo, foo, foo]
2021-05-13 17:25:28.147 INFO 37063 --- [ad | so67520619] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Member consumer-so67520619-1-269ac261-3838-4925-a9a7-fd0687db3522 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-05-13 17:25:33.135 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Failing OffsetCommit request since the consumer is not part of an active group
2021-05-13 17:25:33.141 ERROR 37063 --- [o67520619-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
...
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Lost previously assigned partitions so67520619-0
2021-05-13 17:25:33.141 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Revoked: [so67520619-0]
2021-05-13 17:25:33.142 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.145 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-05-13 17:25:33.145 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] (Re-)joining group
2021-05-13 17:25:33.250 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Finished assignment for group at generation 9: {consumer-so67520619-1-bd22a252-64f2-4be3-a6eb-8371b8f95ff2=Assignment(partitions=[so67520619-0])}
2021-05-13 17:25:33.254 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Successfully joined group with generation 9
2021-05-13 17:25:33.254 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Notifying assignor about the new Assignment(partitions=[so67520619-0])
2021-05-13 17:25:33.255 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Adding newly assigned partitions: so67520619-0
2021-05-13 17:25:33.256 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Found no committed offset for partition so67520619-0
2021-05-13 17:25:33.258 INFO 37063 --- [o67520619-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-so67520619-1, groupId=so67520619] Resetting offset for partition so67520619-0 to offset 0.
2021-05-13 17:25:33.258 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : Assigned: [so67520619-0]
2021-05-13 17:25:33.261 INFO 37063 --- [o67520619-0-C-1] com.example.demo.Listener : [foo, foo, foo, foo, foo, foo]
Kafka complains after 60 seconds and the commit fails 5 seconds later.