i am using spring cloud stream rabbit binder with spring cloud function and defining listeners like:
public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}
I also reroute failed messages to DLQ. Problem is when fatal error like org.springframework.messaging.converter.MessageConversionException
happens. It does not get processed by ConditionalRejectingErrorHandler
like mentioned in https://docs.spring.io/spring-amqp/reference/html/#exception-handling, and keeps cycling forever.
Is there a way to make this work with ConditionalRejectingErrorHandler
?
Right now i fix the problem by using @ServiceActivator(inputChannel = "errorChannel")
and handling the errors myself.
Dependencies:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>