
In @StreamListener way (non-functional) of receiving message there was a condition like this:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")

What i'm doing right now:

spring.cloud.stream.function.definition=process spring.cloud.stream.bindings.process-in-0.destination=my-topic

public Consumer<Message<SomeDTO>> process() {

        return message -> Mono.just(message)
                .doOnNext(msg -> LOGGER.debug("Message received: {}", msg))

But i'm filtering inside the method and when I pass other headers, with another payload, spring doesn't know how to deserealize this message, and brings a com.fasterxml.jackson.core.JsonParseException even before process the message, because can't create "SomeDTO" like this example.

I want to find a way to filter for message header before the spring deserialize it's payload and crash.

I tried to apply the properties way of filtering, but it didn't go well, I still get to the main Consumer, even without headers in the messageGabrieRabelo

