0
votes

I'm trying to create a application-level error handler for failures during the processing inside SCS application using Kafka as a message broker. I know that SCS already provides the DLQ functionality, but in my case I want to wrap failed messages with a custom wrapper type (providing the failure context (source, cause etc.))

In https://github.com/qabbasi/Spring-Cloud-Stream-DLQ-Error-Handling you can see two approaches for this scenario: one is using SCS and the other one directly Spring Integration. (both are atm not working)

According to the current reference (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producing_and_consuming_messages) SCS will allow to publish error messages received from the Spring Integration error channel, but unfortunately this not the case, at least for me. Although the application logs the following upon startup

o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 2 subscriber(s).

1
Just to clarify; If I understand correctly you're asking if/how you can intercept Message going into the error channel so you can customize it. Is that correct?Oleg Zhurakousky
Also, I just noticed you have @EnableBinding(Processor.class) instead of @EnableBinding(CustomDlqMessageChannel.class). Is it an oversight? Using this interface as a parameter to @EnableBinding will trigger the creation of your channel, otherwise nothing will happen.Oleg Zhurakousky
You shouldn't use @StreamListener("errorChannel") - that is consuming from a binder destination; to capture messages sent to the errorChannel use @ServiceActivator(inputChannel = "errorChannel").Gary Russell

1 Answers

2
votes

You shouldn't use @StreamListener("errorChannel") - that is consuming from a binder destination; to capture messages sent to the errorChannel use @ServiceActivator(inputChannel = "errorChannel").

EDIT

There were several problems with your app...

  1. The new error handling code was added in version 1.3
  2. The autoCommitOnError is a kafka binder property
  3. You needed an @EnableBinding(CustomDlqMessageChannel.class)
  4. You don't really need @EnableIntegration - boot does that for you

See my commit here.

and...

$ kafka-console-producer --broker-list localhost:9092 --topic testIn
>foo

and...

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic customDlqTopic --from-beginning
?
contentType>"application/x-java-object;type=com.example.demo.ErrorWrapper"