1
votes

Using spring integration Kafka dsl, I wonder why listener not receive messages? But the same application If I replace spring integration DSL with a method annotated with KafkaListener is able to consume messages fine. What am I missing with DSL?

DSL code that does not consume:

@Configuration
@EnableKafka
class KafkaConfig {
    //consumer factory provided by Spring boot
    @Bean
    IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
        IntegrationFlows
                .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
                .configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
                .id("kafkaTopicListener").autoStartup(true)
        )

                .channel("logChannel")
                .get()
    }
}

logChannel (or any other channel I use), does not reflect inbound messages.

Instead of the above code, If I use plain listener, it works fine to consume messages.

@Component
class KafkaConsumer {
    @KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
    void inboundKafkaEvent(String message) {
        log.debug("message is {}", message)
    }
}

Both approaches uses same application.properties for Kafka consumer.

1
Have you tried to use a different groupId? If the consumer group has already consumed the messages when you have executed the second piece of code, then it won't have any messages to consume when trying to run the first piece of code.Giorgos Myrianthous
Above 2 code snippets do not co-exist. I commented DSL and added the KafkaListener code to just test topic. and yes, topic has messages and producers from another app keep pumping messages.suman j

1 Answers

1
votes

You are missing the fact that you use Spring Integration, but you haven't enable it in your application. You don't need to do that for Kafka though, since you are not going to consume it with the @KafkaListener. So, to enable Spring Integration infrastructure, you need to add @EnableIntegration on your @Configuration class: https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration