In a Spring Boot application, I want to use Spring Integration to read from a Kafka queue. The following is configured:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container);
kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = ... // set properties
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public DirectChannel receiver() {
return new DirectChannel();
}
@Autowired
private Resolver resolver;
@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
return new EventDrivenConsumer(receiver(), resolver);
}
The Resolver
bean implements MessageHandler
.
Messages are received on the queue, but are not processed by the resolver bean.
The Spring Boot application is annotated as follows:
@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
So there should be no auto configuration of Kafka beans.
The following is the error:
java.lang.NullPointerException: null
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
Debugging, it is apparent that in RecordMessagingMessageListenerAdapter
(top of the stacktrace), this.methodHandler
is null.
What is the correct way in Spring Integration to connect the channel to the bean which should process the messages?
KafkaMessageDrivenChannelAdapter
. That one injects anIntegrationRecordMessageListener
listener instead of mentioned aboveRecordMessagingMessageListenerAdapter
. Maybe you can share this Spring Boot project with us somewhere on the GitHub ? - Artem BilanNPE
stack trace? You also can turn on DEBUG logging level for theorg.springframework.kafka
to trace what and how is called. And maybe you'll figure out yourself who and how calls some listener container without ahandlerMethod
. May be you have@EnableKafka
and anything else? - Artem Bilancontainer()
and it is required by theKafkaMessageDrivenChannelAdapter
. - Artem Bilan