0
votes

In @StreamListener way (non-functional) of receiving message there was a condition like this:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")

What i'm doing right now:

spring.cloud.stream.function.definition=process spring.cloud.stream.bindings.process-in-0.destination=my-topic

@Bean
public Consumer<Message<SomeDTO>> process() {

        return message -> Mono.just(message)
                .doOnNext(msg -> LOGGER.debug("Message received: {}", msg))
                .filter(this::headerFilter)
                .map(this::extractEvent)
                .filter(this::payloadAttributeFilter)
                .block(blockTimeout);
    }

But i'm filtering inside the method and when I pass other headers, with another payload, spring doesn't know how to deserealize this message, and brings a com.fasterxml.jackson.core.JsonParseException even before process the message, because can't create "SomeDTO" like this example.

I want to find a way to filter for message header before the spring deserialize it's payload and crash.

1
Take a look at the spring-cloud-function docs : docs.spring.io/spring-cloud-function/docs/3.1.3/reference/html/…dturanski
I tried to apply the properties way of filtering, but it didn't go well, I still get to the main Consumer, even without headers in the messageGabrieRabelo

1 Answers

0
votes