1
votes

I am writing an application that will process event messages (posted to the topic file-upload-completed). I have multiple endpoints that should consume these messages (metadata-reader and quota-checker), and for pragmatic reasons I would like to deploy these endpoints together in an aggregated package.

With Spring Cloud Stream, I could use spring.cloud.stream.bindings.file-upload-completed.group=metadata-reader to set the consumer group for the first endpoint; I would also like to process messages under the quota-check group, but the property-based configuration only permits one consumer group binding per message queue.

Is there a way to configure Spring Cloud Stream so that I can bind to a single message queue under multiple consumer groups in the same application?

2

2 Answers

3
votes

but the property-based configuration only permits one consumer group binding per message queue.

by per message queue, do you mean, per-binding?

Also, when you specify the property spring.cloud.stream.bindings.file-upload-completed.group=metadata-reader, the file-upload-completed here means the bound target name (ex: the channel name) and you can define as many number of channels and their bindings with the specific consumer group that bound to a specific destination (topic: file-upload-completed)

spring.cloud.stream.bindings.mychannel1.destination=file-upload-completed
spring.cloud.stream.bindings.mychannel1.group=metadata-reader

spring.cloud.stream.bindings.mychannel2.destination=file-upload-completed
spring.cloud.stream.bindings.mychannel2.group=metadata-reader

With the above configuration, you can perform operations such as join etc., something like this:

@StreamListener
 public void receive(@Input(Processor.INPUT) SubscribableChannel input1, 
 @Input("mychannel2") SubscribableChannel input2) {
// perform operations (join etc.,)
}
0
votes

AFAIK consumer group means singleton consumer in cluster. If both your endpoints are in the same app, no reason to extract new consumer group. You can just publish-subscribe an incoming message to all your endpoints internally.