I'm using Spring Cloud Stream 3.x in Spring Boot 2.x application to consume messages from a Kafka topic.
I want to have a listener that consumes messages conditionally on some custom header value, as per doc:
@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}
However listener never gets notified, and if condition is removed I see the following in the log:
Received: ... SomeHeader: [B@1055e4af ...
It turns out that custom headers are left in Kafka byte array raw format, making them not eligible for condition evaluation.
Is some additional configuration needed or am I missing something?