I have AWS infrastructure set up so that every update to a dynamo db entry ends up in SQS FIFO queue with deduplication enabled. I also have a test covering this scenario where I purge the queue (The queue can get updates from the other tests in the suit. To avoid having to poll large number of messages before receiving the correct messages,I purge the queue before running the test) and update Dynamo Db and check those entries are received when polling the queue. This test is flaky and sometimes it fails because all the updates I have sent is not received from the queue.
The queue has only one consumer which is the test I have written. So it is not like there is another consumer who consumes these messages.
I checked the queue through AWS console and it is empty at the end of the test and doesn't contains the missing messages when the test times out due to TIMEOUT value set.
My queue configuration in CDK
public Queue createSqsQueue() {
return new Queue(this, "DynamoDbUpdateSqsQueue", QueueProps.builder()
.withContentBasedDeduplication(true)
.withFifo(true)
.withQueueName("DynamoDbUpdateSqsQueue.fifo")
.withReceiveMessageWaitTime(Duration.seconds(20))
.build());
}
My Receive Message Code
private void assertExpectedDynamoDbUpdatesAreReceived() {
List<String> expectedDynamoDbUpdates = getExpectedDynamoDbUpdates();
List<String> actualDynamoDBUpdates = newArrayList();
boolean allDynamoDbUpdatesReceived = false;
stopWatch.start();
while (!allDynamoDbUpdatesReceived && stopWatch.getTime() < TIMEOUT ) {
List<String> receivedDynamoDbUpdates =
AmazonSQSClientBuilder.standard().receiveMessage(queueUrl).getMessages().stream()
.map(this::processAndDelete)
.collect(Collectors.toList());
actualDynamoDBUpdates.addAll(receivedDynamoDbUpdates);
if(actualDynamoDBUpdates.containsAll(expectedDynamoDbUpdates)){
allDynamoDbUpdatesReceived= true;
}
}
stopWatch.stop();
assert(allDynamoDbUpdatesReceived).isTrue();
}