5
votes

I'd like to implement a stateful listener using the Spring Kafka API.

Given the following:

  • A ConcurrentKafkaListenerContainerFactory, with concurrency set to "n"
  • A @KafkaListener annotated method, on a Spring @Service class

Then "n" KafkaMessageListenerContainers will be created. Each one of these will have its own KafkaConsumer, and hence there will be "n" consumer threads - one per consumer.

When messages are consumed, the @KafkaListener method will be called using the same thread that polled the underlying KafkaConsumer. Since there is only on instance of the listener, this listener needs to be thread safe since there will be concurrent access from "n" threads.

I'd like not to think about concurrent access, and hold state in a listener which I know will only ever be accessed by one thread.

How can you create a separate listener per Kafka consumer using the Spring Kafka API?

1
Sorry, this is not the way StackOverflow works. Questions of the form "I want to do X, please tell me how to proceed" are considered off-topic. Please visit the help center and read How to Ask, and especially read Why is “Can someone help me?” not an actual question?Jim Garrison
Thanks. How do you suggest I improve it? Gary's answered it, so he must have understood it.A_M
I've had a go at editing the question. Is it now any better? Gary's answer equally applied to the first wording and the second wording so I don't think I've introduced any ambiguity between the question and the answer.A_M
IMO this question is entirely clear (and was originally). He's asking a very specific question.Gary Russell

1 Answers

3
votes

You are correct; there is one listener instance per container (regardless of whether is is configured as a @KafkaListener or MessageListener).

One work around is to use a prototype scoped MessageListener with n KafkaMessageListenerContainer beans (each having 1 thread).

Then, each container will get its own instance of the listener.

That is not possible with the @KafkaListener POJO abstraction.

It's generally better to use stateless beans, however.

EDIT

I found another work-around using a SimpleThreadScope...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(Container concurrency is 3).

It works fine:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

The only problem is the scope doesn't clean up on its own (e.g. when the container is stopped and the thread goes away. That may not be critical, depending on your use case.

To fix that, we'd need some help from the container (e.g. publish an event on the listener thread when it is stopped). GH-762.