3
votes

I'm working on integrating Spring Integration with AWS SQS queue.

I have an issue when my method annotated with @ServiceActivator throws an exception. It seems that in such cases the message is removed from the queue anyway. I've configured MessageDeletionPolicy to ON_SUCCESS in SqsMessageDrivenChannelAdapter.

Here is my channel/adapter configuration https://github.com/sdusza1/spring-integration-sqs/blob/master/src/main/java/com/example/demo/ChannelConfig.java

I've tried doing the same using @SqsListener annotation and messages are not deleted as expected.

I've created a mini Spring Boot app here to demonstrate this issue: https://github.com/sdusza1/spring-integration-sqs

Please help :)

1
Any chances how to test locally without AWS? I really don't see how it can be possible. Looking into debug I see a proper deletionPolicy propagation.Artem Bilan
Would it help If I embedded ElasticMQ in this project and configured SQS client to use it ? Thx for reply :)Sebastian Dusza
Yes. Indeed. The point is that I don't have AWS account at the moment, but definitely can run and debug something isolated. Thanks for understanding!Artem Bilan
Wait. I see the issue. Will answer...Artem Bilan

1 Answers

4
votes

Your configuration is like this:

@Bean
public MessageProducerSupport sqsMessageDrivenChannelAdapter() {
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(amazonSqs, SQS_QUEUE_NAME);
    adapter.setOutputChannel(inboundChannel());
    adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
    adapter.setVisibilityTimeout(RETRY_NOTIFICATION_AFTER);
    return adapter;

}

Where the inboundChannel is like this:

 @Bean
    public QueueChannel inboundChannel() {
        return new QueueChannel();
 }

So, this is a queue, therefore async and the message from that queue is processed on a separate thread by the TaskScheduler which polls this kind of channel according your PollerMetadata configuration. In this case any errors in the consumer are thrown into that thread as well and don't reach the SqsMessageDrivenChannelAdapter for expected error handling.

This technically is fully different from your @SqsListener experience which is really called directly on the container thread, and therefore its error handling is applied.

Or you need to revise your logic how you would like to handle errors in that separate thread or just don't use a QueueChannel just after SqsMessageDrivenChannelAdapter and let it throw and handle errors in the underlying SQS Listener Container as it is in case of @SqsListener.