I am trying to create a event based system for communicating between services using Apache Kafka as Messaging system and Spring Cloud Stream Kafka.
I have written my Receiver class methods as below,
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}
This method is specifically to catch for messages or events related to EmployeeCreatedEvent.
@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}
This method is specifically to catch for messages or events related to EmployeeTransferredEvent.
@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}
This is the default method.
When I run the application, I am not able to see the methods annoated with condition attribute being called. I only see the handleDefaultEvent method being called.
I am sending a message to this Receiver Application from the Sending/Source App using the below CustomMessageSource class as below,
@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;
public void sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}
I am calling the method from my controller in Source App as below,
customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");
The customMessageSource instance is autowired as below,
@Autowired
CustomMessageSource customMessageSource;
Basicaly, I would like to filter messages received by the Sink/Receiver application and handle them accordingly.
For this I have used the @StreamListener annotation with condition attribute to simulate the behaviour of handling different events.
I am using Spring Cloud Stream Chelsea.SR2 version.
Can someone help me in resolving this issue.