0
votes

I am having a topic "oranges" with 10 partitions, 2 consumers within 1 consumer group. I am using Spring Kafka.

As for some reason, I need to re-read the data from time to time, I need to reset the offsets. My listener implement ConsumerSeekAware and in the onPartitionsAssigned() I simply call callback#seekToBeginning. This works fine as in the log I see messages from Kafka Client API (2.3.1) saying:

Resetting offset for partition oranges-X to offset 0. This happens for all partitions fine.

However, effectively only the last partition is reset (9) and from time to time, if I get lucky the second one (1) too. All others are not getting reset at all.

What is getting me real headaches is: if I omit partition 9 from the list of partitions to be reset, all other partitions get reset fine and everything works as expected.

The code is very simple:

class ... implements ConsumerSeekAware {
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
        callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

Logs:

19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.
1
Can you share your code or some logs?Mickael Maison
Hi @MickaelMaison, I have updated the post. WOuld you like logs after the or prior to the log statements above?Martin Linha
Never heard of anything like this before. If you can provide a small, concise, complete sample that exhibits this behavior, I can take a look.Gary Russell
Hi @GaryRussell, thanks for reply. I will provide an example. I have spent on this already couple of days. What I have found out is that it happens only if I have AckMode set to BATCH and enable.auto.commit set to false. If I change it to true, it works as expected. Looks like there was some pending offset commits? The docu of callback#seekToBeginning says Queue a seekToBeginning operation to the consumer. The seek will occur after any pending offset commits. The consumer must be currently assigned the specified partition.Martin Linha
Which version are you using? That javadoc needs to be fixed; in versions since 1.3 the threading was changed (thanks to KIP-62). onPartitionsAssigned is called on the consumer thread from the poll() and the seeks are now done directly rather than queueing; the seeks are still queued if you save off the callback from in registereSeekCallback and call the callback from outside of the onPartitionsAssigned.Gary Russell

1 Answers

0
votes

I can't reproduce your issue.

Here is my test Spring Boot application:

@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {


    private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62465345Application.class, args);
    }

    @KafkaListener(id = "so62465345", topics = "so62465345")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
                System.currentTimeMillis() + ":foo:" + i));
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        try {
            Thread.sleep(5000);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        LOG.info("Seeking on assignment");
        callback.seekToBeginning(assignments.keySet());
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        LOG.info("Seeking on idle");
        callback.seekToBeginning(assignments.keySet());
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000

I have set a breakpoint in the onIdleContainer and, using kafka-console-consumer, I see that the offsets are not actually reset until the next poll().

Seeking to EARLIEST offset of partition so62465345-1 appears when we perform the seek, but the Resetting offset for partition so62465345-0 to offset 0 doesn't appear until we call poll() again (and then the offsets are actually reset).

So I do see that the seeks don't occur on the current poll, which returns 0 records, but the next poll starts from the beginning.