0
votes

I am trying to explore SNS FIFO topics with SQS FIFO Queue, this what I simply tried. I created SNS FIFO topic and SQS FIFO queue and subscribed the FIFO queue to the FIFO topic. As per the docs, for the aforementioned setting, whenever we publish a message to SNS FIFO queue it should fan-out that message to SQS queue, but it is not happening. I am able to get PublishResult#getMessageId() means the publishing part is happening successfully but the queue doesn't have any messages in it. As the SNS FIFO topic doesn't support email protocol subscription, the only way available for me to assert this pub-sub architecture is, to poll messages from the queue. Because of the fan-out is not happening, the queue seems always empty.

The complete code block:

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.UUID;

class FifoTopicsITest {

    @Test
    void test() {
        final String topicName = UUID.randomUUID().toString().substring(15);
        //creating sns client
        AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
                        "us-west-1")).build();
        //creating sqs client
        AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                "<accessKey>", "<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
                        "us-west-1")).build();

        //creating SNS topic
        CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
        createTopicRequest
                .addAttributesEntry("FifoTopic", "true")
                .addAttributesEntry("ContentBasedDeduplication", "false");
        CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
        String topicArn = topicResult.getTopicArn();

        //creating dead-letter sqs queue
        CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
        createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
        createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
        CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);

        //getting ARN value of dead-letter queue
        GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
                        .withAttributeNames("QueueArn"));
        String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //creating sqs queue
        CreateQueueRequest createQueueRequest = new CreateQueueRequest();
        createQueueRequest.addAttributesEntry("FifoQueue", "true");
        createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
        createQueueRequest.withQueueName(topicName + ".fifo");
        String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
                + deleteQueueArn + "\"}";
        createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
        CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
        String queueUrl = createQueueResult.getQueueUrl();

        //getting ARN value of queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("QueueArn"));
        String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //Subscribe FIFO queue to FIFO Topic
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.withProtocol("sqs")
                .withTopicArn(topicArn)
                .withEndpoint(queueArn);
        SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
        Assertions.assertNotNull(subscribeResult.getSubscriptionArn());

        //Publishing 4 sample message to FIFO SNS Topic
        for (int i = 0; i < 5; i++) {
            PublishRequest publishRequest = new PublishRequest()
                    .withTopicArn(topicArn)
                    .withMessage("Test Message" + i)
                    .withMessageGroupId(topicName)
                    .withMessageDeduplicationId(UUID.randomUUID().toString());
            PublishResult publishResult = amazonSNS.publish(publishRequest);
            Assertions.assertNotNull(publishResult.getMessageId());
        }

        //Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("All"));
        String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
                                                     .get("ApproximateNumberOfMessages");

        //My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
        Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
    }
}

SNS Access policy (Permissions)

{
  "Version": "2008-10-17",
  "Id": "__default_policy_ID",
  "Statement": [
    {
      "Sid": "__default_statement_ID",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "SNS:GetTopicAttributes",
        "SNS:SetTopicAttributes",
        "SNS:AddPermission",
        "SNS:RemovePermission",
        "SNS:DeleteTopic",
        "SNS:Subscribe",
        "SNS:ListSubscriptionsByTopic",
        "SNS:Publish",
        "SNS:Receive"
      ],
      "Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
      "Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account>"
        }
      }
    }
  ]
}

SQS Access policy (Permissions)


{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

What am I missing? why the messages are not present in the SQS queue. is there anything I should do with the SQS Queue permission as below?

{
  "Id": "Policy1611770719125",
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Stmt1611770707743",
      "Action": [
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueueTags",
        "sqs:ListQueues",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:SendMessageBatch",
        "sqs:SetQueueAttributes"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}

1

1 Answers

1
votes

Sharing my answer for posterity, as suspected the actual issue is related to Access Policy when we create FIFO SNS queue and subscribe it to the SQS FIFO queue with AWS SDK V1, the default Access policy will be as below

{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

The above access policy will be the same even I tried to create SQS FIFO queue with AWS SDK v2 link. So when I manually change the access policy as below, the issue has been resolved and the FIFO SNS topic fan-out happening as specified:

{
  "Statement": [
    {
      "Action": [
        "sqs:*"
      ],
      "Effect": "Allow",
      "Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
      "Principal": {
        "AWS": "*"
      }
    }
  ]
}

Code block to add the above Access policy for every FIFO queue:

Policy policy = new Policy().withStatements(
        new Statement(Statement.Effect.Allow)
                .withPrincipals(Principal.AllUsers)
                .withResources(new Resource(queueArn))
                .withActions(SQSActions.AllSQSActions));

Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));

Added the above code block after creating the SQS FIFO queue solved the issue eventually.