6
votes

I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

and it looks like there is no support for Spring in reactive kafka yet

I understand how kafka listeners work in non-reactive kafka API in Spring: simplest solution is to configure beans for ConcurrentKafkaListenerContainerFactory and ConsumerFactory, then use @KafkaListener annotation and voila

But I'm not sure how to properly use reactive kafka in Spring right now.

Basically I need a listener for topic. Should I create some kind of loop or scheduler of my own? Or maybe I'm missing something. Can anyone share their knowledge and best practices?

1
Did you see a ReactiveKafkaConsumerTemplate in Spring for Apache Kafka project: github.com/spring-projects/spring-kafka/blob/master/… ?Artem Bilan
Reactive support for @KafkaListener is on the road map for next year. Right now, all we have is the lightweight wrapper that Artem mentioned. That said, managing partition offsets for a reactive (or any async) consumer is particularly difficult.Gary Russell
@ArtemBilan thanks for the link, will look into thatdamncoffie

1 Answers

0
votes

I don't have a ready solution yet but i'm trying this (Kotlin code, Spring Boot). Someone published part of this code snippet here https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

Look into other stack overflow questions. There is not much there, but maybe will give you some ideas