1
votes

I have AWS infrastructure set up so that every update to a dynamo db entry ends up in SQS FIFO queue with deduplication enabled. I also have a test covering this scenario where I purge the queue (The queue can get updates from the other tests in the suit. To avoid having to poll large number of messages before receiving the correct messages,I purge the queue before running the test) and update Dynamo Db and check those entries are received when polling the queue. This test is flaky and sometimes it fails because all the updates I have sent is not received from the queue.

The queue has only one consumer which is the test I have written. So it is not like there is another consumer who consumes these messages.

I checked the queue through AWS console and it is empty at the end of the test and doesn't contains the missing messages when the test times out due to TIMEOUT value set.

My queue configuration in CDK

public Queue createSqsQueue() {
    return new Queue(this, "DynamoDbUpdateSqsQueue", QueueProps.builder()
            .withContentBasedDeduplication(true)
            .withFifo(true)
            .withQueueName("DynamoDbUpdateSqsQueue.fifo")
            .withReceiveMessageWaitTime(Duration.seconds(20))
            .build());
}

My Receive Message Code

private void assertExpectedDynamoDbUpdatesAreReceived() {
    List<String> expectedDynamoDbUpdates = getExpectedDynamoDbUpdates();
    List<String> actualDynamoDBUpdates = newArrayList();
    boolean allDynamoDbUpdatesReceived = false;
    stopWatch.start();
    while (!allDynamoDbUpdatesReceived && stopWatch.getTime() < TIMEOUT ) {
        List<String> receivedDynamoDbUpdates =
                AmazonSQSClientBuilder.standard().receiveMessage(queueUrl).getMessages().stream()
                        .map(this::processAndDelete)
                        .collect(Collectors.toList());
        actualDynamoDBUpdates.addAll(receivedDynamoDbUpdates);
        if(actualDynamoDBUpdates.containsAll(expectedDynamoDbUpdates)){
            allDynamoDbUpdatesReceived= true;
        }
    }
    stopWatch.stop();
    assert(allDynamoDbUpdatesReceived).isTrue();
}
1

1 Answers

1
votes

The issue was not in receiving the messages. It was in purging the queue. According to the purge queue documentation (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-purge-queue.html)

The message deletion process takes up to 60 seconds. We recommend waiting for 60 seconds regardless of your queue's size

Simply adding a wait of 60 seconds before sending updates fixed the issue.