0
votes

This is sort of sequel to this question. Can I use "plain" Apache Kafka Binder together with functional model? So far using annotation based configuration I mixed both, spring-cloud-stream-binder-kafka for simple consuming / producing and spring-cloud-stream-binder-kafka-streams for advanced stream processing in one application.

Functional model seems to be supported only by streams binder, and if I try to mix both approaches - annotation based for simple usage and functional for streams, stream binding is not registered.

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            simple-binding-in:
              destination: another-topic

public interface SimpleBinding {

    String INPUT = "simple-binding-in";

    @Input(INPUT)
    SubscribableChannel simpleIn();

}

@Component
public class SimpleListener {

    @StreamListener(SimpleBinding.INPUT)
    public void listen(@Payload SomeDto payload) {
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}

@EnableBinding(SimpleBinding.class) is present on configuration class. Is it preferred / supported to mix both as described or should I use streams-binder even for simple message consumption?

1

1 Answers

0
votes

For Kafka Binder you can and absolutely should use functional model and forget about StreamListener all together. This way it's going to be aligned with your KStream functional model.

spring.cloud:
        stream:
          function:
            definition: processStream
          bindings:
            processStream-in-0:
              destination:  my-topic
            listen-in-0:
              destination: another-topic

@Component
public class SimpleListener {

    @Bean
    public Consumer<SomeDto> listen() {
        return payload -> ...
    }
}

@Configuration
public class FunctionalStream {

    @Bean
    public Consumer<KStream<String>> processStream() {
        return eventStream -> eventStream.map()
    }
}