3
votes

I have a camel route defined roughly as follows:

from("aws-sqs://my-queue")
.process(myProcessor)
.to(myEndpoint);

However, myProcessor relies on an upstream service that sometimes becomes unavailable or returns a "pending" response, so the processor throws an exception and the exchange is left on the queue. Camel then polls and fails successively until the message is DLQed.

What I would like to do is configure visibilityTimeout dynamically to implement an exponential back-off until the upstream service is available again. So instead of from("aws-sqs://my-queue?visibilityTimeout=30"), I would register an exception handler in my route and inside of it, do something like:

sqsClient.setVisibilityTimeout(Math.pow(2 * visibilityTimeout));

Is this possible? And as a bonus, is it possible to configure the visibility timeout for a specific message id (eg. so it doesn't affect the visibility timeout of other messages)?

2

2 Answers

0
votes

Yes, it's possible to change VisibilityTimeout for messages being processed.

From AWS docs:

When you receive a message for a queue and begin to process it, the visibility timeout for the queue may be insufficient (for example, you might need to process and delete a message). You can shorten or extend a message's visibility by specifying a new timeout value using the ChangeMessageVisibility action.

For example, if the timeout for a queue is 60 seconds, 15 seconds have elapsed, and you send a ChangeMessageVisibility call with VisibilityTimeout set to 10 seconds, the total timeout value will be the elapsed time (15 seconds) plus the new timeout value (10 seconds), a total of 25 seconds. Sending a call after 25 seconds will result in an error.

Note

The new timeout period will take effect from the time you call the ChangeMessageVisibility action. In addition, the new timeout period will apply only to the particular receipt of the message. The ChangeMessageVisibility action does not affect the timeout of later receipts of the message or later queues.

0
votes

AWS SQS provides the flexibility to change the visibilityTimeout at message level (when processing the message) by calling "ChangeMessageVisibility" method.

However the current visibility timeout of the message can't be retrieved to perform your desired operation (Math.pow(2 * visibilityTimeout));

ChangeMessageVisibility method call:

sqsClient.ChangeMessageVisibility(myEndpoint, myMessage.getReceiptHandle(), 60);

The above code adds a visibilityTimeout of 60 seconds to the elapsing timeout of the message.

(i.e) If the message has default queue level visibilityTimeout of 30 seconds left, the above code adds 60 seconds to it and increases the message unavailability for other consumers to 90 seconds.