3
votes

i am using spring cloud stream rabbit binder with spring cloud function and defining listeners like:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

I also reroute failed messages to DLQ. Problem is when fatal error like org.springframework.messaging.converter.MessageConversionException happens. It does not get processed by ConditionalRejectingErrorHandler like mentioned in https://docs.spring.io/spring-amqp/reference/html/#exception-handling, and keeps cycling forever.

Is there a way to make this work with ConditionalRejectingErrorHandler?

Right now i fix the problem by using @ServiceActivator(inputChannel = "errorChannel") and handling the errors myself.

Dependencies:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-hateoas</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
</dependencies>
1

1 Answers

0
votes

We've long debated error handling and other features that we use for imperative functions and how they apply (or can they even be applied) to reactive functions and tried few different things, but unfortunately it all comes down to impedance mismatch.

The approaches you are describing are based on the operating on a single Message. That is the unit of work within the imperative style message handlers such as Function<String, String>. You use reactive style and by doing so changed the unit of work from a single message in the stream to the entire stream.

In short:

- Function<?, ?> - unit of work is Message
- Function<Flux<?>, Flux<?>> - unit of work is the entire stream

You can also observe it easily as reactive function is only invoked once for the duration of the life of the application while imperative is invoked once per each arriving message. The reason why I am saying that is that the framework-based approaches we use for imperative message handlers (functions) can not be applied to reactive without causing side-effects. And generally reactive developers understand this especially given the richness of the reactive API specifically with regard to error handling

In any event, we'll update documentation accordingly.