1
votes

AFAIK the Spring Cloud Stream project is based on Spring Integration. Hence I was wondering if there is a nice way to resequence a subset of inbound messages before the StreamListener handler is triggered? Or do I need to assemble the whole IntegrationFlow from scratch using XML or Java DSL config from Spring Integration?

My use case is as follows. Most of the time I process inbound messages on a Kafka topic as they come. However, a few events have to be resequenced based on CORRELATION_ID, SEQUENCE_NUMBER, and SEQUENCE_SIZE headers. In other words I'd like to keep using StreamListener as much as possible and simply plug in resequencing strategy for some events.

1

1 Answers

1
votes

Yes, you would need to use Spring Integration for it. In fact Spring Cloud Stream is effectively a binding framework only. It binds message handlers to the message brokers via binders. The message handlers themselves are provided by the users. The @StreamListener annotation is pretty much an equivalent of Spring Integration's @ServiceActivator with few extra features (e.g., conditional routing), but other then it is just a message handler.

Now, as you eluded to, you are aware that you can use Spring Integration (SI) to implement a message handler or an internal SI flow, and that is normal and recommended for complex cases.

That said, we do provide out of the box apps that implements certain EIP components and we do have, for example, and aggregator app which you can use as a starting point in implementing resequencer. Further more, given that we have an aggregator app and not resequencer, we would be glad to accept a contribution for it if you're interested. I hope this answers you question.