0
votes

I am using Spring Cloud AWS (1.0.1.RELEASE) with Spring Boot to run a SQS consumer. The application runs fine, but when it looses network connection (for instance if I switch my WIFI off on my laptop when it runs on it), I see errors on the console and the application never recovers. It just hangs there and does not reconnect after the network becomes available. I have to kill it and bring it up. How do I force it to recover by itself?

// Spring Boot entry point: 
public static void main(String[] args) {
    SpringApplication.run(MyConsumerConfiguration.class, args);
}

// Message Listener (A different class)
@MessageMapping(value = "myLogicalQueueName" )
public void receive(MyPOJO object) {

}

The error I see at console:

Exception in thread "simpleMessageListenerContainer-1" com.amazonaws.AmazonClientException: Unable to execute HTTP request: sqs.us-east-1.amazonaws.com at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:473) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:297) at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2422) at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1130) at com.amazonaws.services.sqs.AmazonSQSAsyncClient$23.call(AmazonSQSAsyncClient.java:1678) at com.amazonaws.services.sqs.AmazonSQSAsyncClient$23.call(AmazonSQSAsyncClient.java:1676) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745

1
Hi Mohamed, do you mind sharing your project? I am struggling to get my Spring Boot app with Spring Cloud AWS to read from AWS SQS ( stackoverflow.com/questions/37297688/… ). Thank you!Francesco

1 Answers

3
votes

I just figured out the problem why SQS is not able to reconnect after network connection lost.

Actually seems to be a problem in current Spring AWS implementation of org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.java

private class AsynchronousMessageListener implements Runnable {

    private final QueueAttributes queueAttributes;
    private final String logicalQueueName;

    private AsynchronousMessageListener(String logicalQueueName, QueueAttributes queueAttributes) {
        this.logicalQueueName = logicalQueueName;
        this.queueAttributes = queueAttributes;
    }

    @Override
    public void run() {
        while (isRunning()) {
            ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
            CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
            for (Message message : receiveMessageResult.getMessages()) {
                if (isRunning()) {
                    MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                    getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
                } else {
                    break;
                }
            }
            try {
                messageBatchLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

Above code spins up a new thread which does the polling to SQS queue to grab messages. Once network connection is dropped getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest()) throws UnknownHostException, which is not handled in the code and causes thread termination. So when network connection is established later on, there is no thread polling the queue to retrieve the data.

I have already raised a issue with Spring for this. Following is the link: https://github.com/spring-cloud/spring-cloud-aws/issues/82

Hope this explains it all.