0
votes

I'm using Spring Cloud Stream 3.x in Spring Boot 2.x application to consume messages from a Kafka topic.

I want to have a listener that consumes messages conditionally on some custom header value, as per doc:

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
  LOGGER.info("Received: {}", message);
}

However listener never gets notified, and if condition is removed I see the following in the log:

Received: ... SomeHeader: [B@1055e4af ...

It turns out that custom headers are left in Kafka byte array raw format, making them not eligible for condition evaluation.

Is some additional configuration needed or am I missing something?

1

1 Answers

1
votes

After some digging in sources and stackoveflow I have found the following:

So I added my custom header mapper bean (bean name is important, it allows to omit additional configuration property), which maps my custom header to String:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
    headerMapper.setRawMappedHeaders(Map.of(
        "SomeHeader", true
    ));
    return headerMapper;
}

That fixed the problem:

Received: ... SomeHeader: SomeHeaderValue ...

P.S. It seems like a bug in Spring Cloud Stream:

  1. It introduces its own implementation of header mapper (BinderHeaderMapper), but the latter doesn't respect conditional routing feature.
  2. Header mapper is subclassed in KafkaMessageChannelBinder, this added behaviour is non-obvious and will be lost if custom header mapper is provided.