I am currently having a problem handling exceptions thrown during message processing in Spring Cloud Streams (Elmhurst.RELEASE). When my application throws an exception in the main processing method:
@SpringBootApplication
@EnableBinding(Processor.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String process(String message) {
externalService();
return message.toUpperCase();
}
private void externalService() {
throw new RuntimeException("An external call failed");
}
}
I seem utterly unable to catch this exception on any error channel. From reading the documentation I expected that I would be able to to receive an ErrorMessage on either the global "errorChannel" or on the specific "input.myGroup.errors" channel.
I have attempted to use Spring Integrations listeners:
@ServiceActivator(inputChannel="errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
As well as Spring Cloud Streams listeners:
@StreamListener("errorChannel")
public ErrorMessage onError(ErrorMessage message) {
return message;
}
with no success. I have also played with various combinations of configuration settings like "errorChannelEnabled: true", "max-retries: 1", etc. with zero effect on me being able to catch an error thrown by my Processor (Using a debug point on "return message" to check). I don't even receive any error messages in the topic when configured "bindings.error.destination: myErrorTopic" as suggested in the SCS documentation.
The only thing that seems to work at all is "enableDlq: true" at the binder configuration level. However this does not meet important needs of being able to determine the original exception, as what is posted to the DLQ only has a header with a full stacktrace. I don't want to parse a stacktrace to figure out the type or message of the original exception.
Is there a better approach I should be taking here? Or is there some silly mistake I'm making? My overall goal is to send messages on exceptions with the actual Exception type and Exception messages to a DLQ.
Of course I could put try/catch statements around the whole of every Processor/Source/Sink method and then manually route to a different bound channel, but that detracts greatly from the value proposition of the SCS framework in my mind.
I found this example of custom DLQ handling as part of this earlier StackOverflow question, which would seem to meet my needs. However this doesn't seem to work at all with SCS Elmhurst/2.0 and Spring Kafka binder, even after migrating configuration to work for the updated SCS.
EDIT I've added a Github repository reproducing my error, as copying the same core code as Gary's answer doesn't seem to work. I'm starting to wonder if it's a POM dependency, configuration, or Kafka binder issue. This repo uses the same core code as Gary's answer, as I think it's a bit simpler to see the issue and debug.
EDIT AFTER GARY'S ANSWER I've accepted Gary's answer as it solves my original problem (getting around current framework issue when there are binder environments in config). However the DLQ messages ended up being fairly unhelpful for my case. I ended up subscribing to "errorChannel" once I got that working per Gary's answer, and from there creating a custom error message that I sent to a normal bound SCS channel.