0
votes

I am trying to create a event based system for communicating between services using Apache Kafka as Messaging system and Spring Cloud Stream Kafka.

I have written my Receiver class methods as below,

    @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
    public void handleEmployeeCreatedEvent(@Payload String payload) {
        logger.info("Received EmployeeCreatedEvent: " + payload);
    }

This method is specifically to catch for messages or events related to EmployeeCreatedEvent.

    @StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
    public void handleEmployeeTransferredEvent(@Payload String payload) {
        logger.info("Received EmployeeTransferredEvent: " + payload);
    }

This method is specifically to catch for messages or events related to EmployeeTransferredEvent.

    @StreamListener(target = Sink.INPUT)
    public void handleDefaultEvent(@Payload String payload) {
        logger.info("Received payload: " + payload);
    }

This is the default method.

When I run the application, I am not able to see the methods annoated with condition attribute being called. I only see the handleDefaultEvent method being called.

I am sending a message to this Receiver Application from the Sending/Source App using the below CustomMessageSource class as below,

@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
    @Autowired
    private Source source;                


    public void  sendMessage(String payload,String eventType) {
        Message<String> myMessage = MessageBuilder.withPayload(payload)
                .setHeader("eventType", eventType)
                .build();
        source.output().send(myMessage);

     }

}

I am calling the method from my controller in Source App as below,

customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");

The customMessageSource instance is autowired as below,

@Autowired
CustomMessageSource customMessageSource;

Basicaly, I would like to filter messages received by the Sink/Receiver application and handle them accordingly.

For this I have used the @StreamListener annotation with condition attribute to simulate the behaviour of handling different events.

I am using Spring Cloud Stream Chelsea.SR2 version.

Can someone help me in resolving this issue.

1

1 Answers

1
votes

It seems like the headers are not propagated. Make sure you include the custom headers in spring.cloud.stream.kafka.binder.headers http://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties .