0
votes

I am having a topic "oranges" with 10 partitions, 2 consumers within 1 consumer group. I am using Spring Kafka.

As for some reason, I need to re-read the data from time to time, I need to reset the offsets. My listener implement ConsumerSeekAware and in the onPartitionsAssigned() I simply call callback#seekToBeginning. This works fine as in the log I see messages from Kafka Client API (2.3.1) saying:

Resetting offset for partition oranges-X to offset 0. This happens for all partitions fine.

However, effectively only the last partition is reset (9) and from time to time, if I get lucky the second one (1) too. All others are not getting reset at all.

What is getting me real headaches is: if I omit partition 9 from the list of partitions to be reset, all other partitions get reset fine and everything works as expected.

The code is very simple:

class ... implements ConsumerSeekAware {
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
        callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

Logs:

19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
1
Can you share your code or some logs? - Mickael Maison
Hi @MickaelMaison, I have updated the post. WOuld you like logs after the or prior to the log statements above? - Martin Linha
Never heard of anything like this before. If you can provide a small, concise, complete sample that exhibits this behavior, I can take a look. - Gary Russell
Hi @GaryRussell, thanks for reply. I will provide an example. I have spent on this already couple of days. What I have found out is that it happens only if I have AckMode set to BATCH and enable.auto.commit set to false. If I change it to true, it works as expected. Looks like there was some pending offset commits? The docu of callback#seekToBeginning says Queue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition. - Martin Linha
Which version are you using? That javadoc needs to be fixed; in versions since 1.3 the threading was changed (thanks to KIP-62). onPartitionsAssigned is called on the consumer thread from the poll() and the seeks are now done directly rather than queueing; the seeks are still queued if you save off the callback from in registereSeekCallback and call the callback from outside of the onPartitionsAssigned. - Gary Russell

1 Answers

0
votes

I can't reproduce your issue.

Here is my test Spring Boot application:

@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {


    private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62465345Application.class, args);
    }

    @KafkaListener(id = "so62465345", topics = "so62465345")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
                System.currentTimeMillis() + ":foo:" + i));
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        try {
            Thread.sleep(5000);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        LOG.info("Seeking on assignment");
        callback.seekToBeginning(assignments.keySet());
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        LOG.info("Seeking on idle");
        callback.seekToBeginning(assignments.keySet());
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000

I have set a breakpoint in the onIdleContainer and, using kafka-console-consumer, I see that the offsets are not actually reset until the next poll().

Seeking to EARLIEST offset of partition so62465345-1 appears when we perform the seek, but the Resetting offset for partition so62465345-0 to offset 0 doesn't appear until we call poll() again (and then the offsets are actually reset).

So I do see that the seeks don't occur on the current poll, which returns 0 records, but the next poll starts from the beginning.