In @StreamListener way (non-functional) of receiving message there was a condition like this:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
What i'm doing right now:
spring.cloud.stream.function.definition=process
spring.cloud.stream.bindings.process-in-0.destination=my-topic
@Bean
public Consumer<Message<SomeDTO>> process() {
return message -> Mono.just(message)
.doOnNext(msg -> LOGGER.debug("Message received: {}", msg))
.filter(this::headerFilter)
.map(this::extractEvent)
.filter(this::payloadAttributeFilter)
.block(blockTimeout);
}
But i'm filtering inside the method and when I pass other headers, with another payload, spring doesn't know how to deserealize this message, and brings a com.fasterxml.jackson.core.JsonParseException even before process the message, because can't create "SomeDTO" like this example.
I want to find a way to filter for message header before the spring deserialize it's payload and crash.