This question is similar to Spring Cloud Stream topic per message for different consumers but the difference is that I want multiple Sinks in one consumer springboot application and I want to do this by rabbitmq topic(which is by default in spring cloud stream). I am not able to figure out correct configuration or somethign wrong in code. I have 3 sinks/cosumers. consumer1 is default and every message goes there.
**Updated as suggested by Garry **
Comment: my Producer App has routing key='*.events' application.yml
spring:
cloud:
stream:
bindings:
output:
destination: my-exchange
rabbit:
bindings:
output:
producer:
routing-key-expression: headers['*.events']
application:
name: publisher-service
server:
port: 15010
Producer code snippet Comment:message is sent with routing key ="test.events" . I sm not sure of 2nd argument but i am assuming it is bindingrouting-key =test1.events.billing which means I want it to be delivered to billing consumer besides default consumer.
source.output().send(MessageBuilder.withPayload(eventRequest.getEventMessage())
.setHeader("*.events", "test1.events.billing")
.build());
Consumer configuration Comment: I want 3 queues assigned to exchange ="myexchange" . I am not sure if config is right. application.yml
spring:
cloud:
stream:
bindings:
defaultconsumer:
destination: my-exchange
group: queue1
billingconsumer:
destination: my-exchange
group: queue2
messageconsumer:
destination: my-exchange
group: queue3
rabbit:
bindings:
defaultconsumer:
consumer:
bindingRoutingKey: '*.events.#'
billingconsumer:
consumer:
bindingRoutingKey: test1.events.billing
messageconsumer:
consumer:
bindingRoutingKey: test2.events.messages
application:
name: subscriber-service
server:
port: 15020
Consumer code: IEventConsumer.java Comment: I am not sure the code below is right
public interface IEventConsumer {
String INPUT = "my-exchange";
@Input
SubscribableChannel defaultconsumer();
@Input
SubscribableChannel billingconsumer();
@Input
SubscribableChannel messageconsumer();
}
EventConsumer.java Comment: All Iwant from below is the message should not be received my messsageConsumer! But in reality it goes thru all these methods.
@StreamListener("defaultconsumer")
public void subscribe1(EventMessage eventMessage) {
logger.info(" DefaultEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("billingconsumer")
public void subscribe2(EventMessage eventMessage) {
logger.info(" billingEventConsumer received new event [" + eventMessage.toString() + "] ");
}
@StreamListener("messageconsumer")
public void subscribe3(EventMessage eventMessage) {
logger.info(" messageEventConsumer received new event [" + eventMessage.toString() + "] ");
}
Apparently something is wrong above and I dont see this working .Any ideas?