1
votes

In a Spring Boot application, I want to use Spring Integration to read from a Kafka queue. The following is configured:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = ... // set properties
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public DirectChannel receiver() {
    return new DirectChannel();
}

@Autowired
private Resolver resolver;

@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
    return new EventDrivenConsumer(receiver(), resolver);
}

The Resolver bean implements MessageHandler.

Messages are received on the queue, but are not processed by the resolver bean.

The Spring Boot application is annotated as follows:

@SpringBootApplication(exclude = KafkaAutoConfiguration.class)

So there should be no auto configuration of Kafka beans.

The following is the error:

java.lang.NullPointerException: null
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

Debugging, it is apparent that in RecordMessagingMessageListenerAdapter (top of the stacktrace), this.methodHandler is null.

What is the correct way in Spring Integration to connect the channel to the bean which should process the messages?

1
Looks like there is something else in your application not shown in this config. According your stack trace there is no any interaction with the KafkaMessageDrivenChannelAdapter. That one injects an IntegrationRecordMessageListener listener instead of mentioned above RecordMessagingMessageListenerAdapter. Maybe you can share this Spring Boot project with us somewhere on the GitHub ? - Artem Bilan
Artem, thanks. Have updated the question to point out that I am using @SpringBootApplication(exclude = KafkaAutoConfiguration.class) to explicitly exclude auto configuration of Kafka. - user1052610
Thanks. That still doesn't help sorry. Maybe there is something else in the logs around that NPE stack trace? You also can turn on DEBUG logging level for the org.springframework.kafka to trace what and how is called. And maybe you'll figure out yourself who and how calls some listener container without a handlerMethod. May be you have @EnableKafka and anything else? - Artem Bilan
Artem, thanks. There is no @EnableKafka. With debug I see: s.i.k.i.KafkaMessageDrivenChannelAdapter : started adapter, soon after o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]. I understand that KafkaMessageListenerContainer should not be getting instantiated, is that correct? - user1052610
No, it has to be because it is a bean container() and it is required by the KafkaMessageDrivenChannelAdapter. - Artem Bilan

1 Answers

0
votes

This was the resolution:

The parent declaration for the project is as follows:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

For the following dependency:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>${spring-integration-kafka.version}</version>
</dependency>

was previously using, for spring-integration-kafka.version:

3.0.1.RELEASE

Changed this to:

2.1.0.RELEASE

And everything works.

Without an explicit version for spring-integration-kafka, however, the project would not build because of missing classes.

One of the advantages of Boot is that it handles the dependency versions. Perhaps there should be a spring-boot-integration-kafka dependency, which would have prevented this issue.