I am trying to explore SNS FIFO topics with SQS FIFO Queue, this what I simply tried. I created SNS FIFO topic and SQS FIFO queue and subscribed the FIFO queue to the FIFO topic. As per the docs, for the aforementioned setting, whenever we publish a message to SNS FIFO queue it should fan-out that message to SQS queue, but it is not happening. I am able to get PublishResult#getMessageId() means the publishing part is happening successfully but the queue doesn't have any messages in it. As the SNS FIFO topic doesn't support email protocol subscription, the only way available for me to assert this pub-sub architecture is, to poll messages from the queue. Because of the fan-out is not happening, the queue seems always empty.
The complete code block:
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating sqs client
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating SNS topic
CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
createTopicRequest
.addAttributesEntry("FifoTopic", "true")
.addAttributesEntry("ContentBasedDeduplication", "false");
CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
String topicArn = topicResult.getTopicArn();
//creating dead-letter sqs queue
CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//creating sqs queue
CreateQueueRequest createQueueRequest = new CreateQueueRequest();
createQueueRequest.addAttributesEntry("FifoQueue", "true");
createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createQueueRequest.withQueueName(topicName + ".fifo");
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResult.getQueueUrl();
//getting ARN value of queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.withProtocol("sqs")
.withTopicArn(topicArn)
.withEndpoint(queueArn);
SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResult.getSubscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage("Test Message" + i)
.withMessageGroupId(topicName)
.withMessageDeduplicationId(UUID.randomUUID().toString());
PublishResult publishResult = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResult.getMessageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All"));
String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages");
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
}
}
SNS Access policy (Permissions)
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<account>"
}
}
}
]
}
SQS Access policy (Permissions)
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
What am I missing? why the messages are not present in the SQS queue. is there anything I should do with the SQS Queue permission as below?
{
"Id": "Policy1611770719125",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1611770707743",
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueueTags",
"sqs:ListQueues",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:SetQueueAttributes"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}