You are correct; there is one listener instance per container (regardless of whether is is configured as a @KafkaListener
or MessageListener
).
One work around is to use a prototype scoped MessageListener
with n KafkaMessageListenerContainer
beans (each having 1 thread).
Then, each container will get its own instance of the listener.
That is not possible with the @KafkaListener
POJO abstraction.
It's generally better to use stateless beans, however.
EDIT
I found another work-around using a SimpleThreadScope
...
@SpringBootApplication
public class So51658210Application {
public static void main(String[] args) {
SpringApplication.run(So51658210Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
KafkaListenerEndpointRegistry registry) {
return args -> {
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
template.send("so51658210", 0, "", "foo");
template.send("so51658210", 1, "", "bar");
template.send("so51658210", 2, "", "baz");
};
}
@Bean
public ActualListener actualListener() {
return new ActualListener();
}
@Bean
@Scope("threadScope")
public ThreadScopedListener listener() {
return new ThreadScopedListener();
}
@Bean
public static CustomScopeConfigurer scoper() {
CustomScopeConfigurer configurer = new CustomScopeConfigurer();
configurer.addScope("threadScope", new SimpleThreadScope());
return configurer;
}
@Bean
public NewTopic topic() {
return new NewTopic("so51658210", 3, (short) 1);
}
public static class ActualListener {
@Autowired
private ObjectFactory<ThreadScopedListener> listener;
@KafkaListener(id = "foo", topics = "so51658210")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
this.listener.getObject().doListen(in, partition);
}
}
public static class ThreadScopedListener {
private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(in + ":"
+ Thread.currentThread().getName() + ":"
+ this.hashCode() + ":"
+ partition);
}
}
}
(Container concurrency is 3).
It works fine:
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
The only problem is the scope doesn't clean up on its own (e.g. when the container is stopped and the thread goes away. That may not be critical, depending on your use case.
To fix that, we'd need some help from the container (e.g. publish an event on the listener thread when it is stopped). GH-762.