1
votes

There is an official examples of MassTransit with SQS. The "bus" is configured to use SQS (x.UsingAmazonSqs). The receive endpoint is an SQS which in turn subscribed to an SNS topic. However there is no example how to Publish into SNS.

  1. How to publish into SNS topic?
  2. How to configure SQS/SNS to use http, since I develop against localstack?

AWS sdk version:

var cfg = new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://localhost:4566", UseHttp = true };

Update:

After Chris's reference and experiments with configuration I came up with the following for the 'localstack' SQS/SNS. This configuration executes without errors and Worker gets called, and publishes a message to a bus. However consumer class is not triggered and doesn't seem messages end up in the queue (or rather topic).

public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.AddConsumer<MessageConsumer>();
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);

            h.EnableScopedTopics();
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s =>
            {
            });
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();
 

Update 2:

When I look at sns subscriptions I see that the first which was created and subscribed manually through aws cli has a correct Endpoint, while the second that was created by MassTransit library has incorrect one. How to configure Endpoint for the SQS queue?

$ aws --endpoint-url=http://localhost:4566 sns list-subscriptions-by-topic --topic-arn "arn:aws:sns:us-east-1:000000000000:deal-topic"
{
    "Subscriptions": [
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:c804da4a-b12c-4203-83ec-78492a77b262",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "http://localhost:4566/000000000000/deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        },
        {
            "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:deal-topic:b47d8361-0717-413a-92ee-738d14043a87",
            "Owner": "",
            "Protocol": "sqs",
            "Endpoint": "arn:aws:sqs:us-east-1:000000000000:deal_queue",
            "TopicArn": "arn:aws:sns:us-east-1:000000000000:deal-topic"
        }

Update 3:

I've cloned the project and ran some unit tests of the project for AmazonSQS bus configuration, consumers don't seem to work. enter image description here

When I list subscriptions after the test run I can tell that Endpoints are incorrect.

...
{
    "SubscriptionArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage:e16799c2-9dd3-458d-bc28-52a16d646de3",
    "Owner": "",
    "Protocol": "sqs",
    "Endpoint": "arn:aws:sqs:us-east-1:000000000000:input_queue",
    "TopicArn": "arn:aws:sns:us-east-1:000000000000:MassTransit_TestFramework_Messages-PongMessage"
},
...

Could it be that AmazonSQS for localstack has a major bug?

It's not clear how to use library with 'localstack' sqs, how to point out to actual endpoint (QueueUrl) of an SQS queue.

2

2 Answers

1
votes

Looks like your configuration is setup properly to publish, but there are probably at least a few reasons I can think of why you are not receiving messages:

  1. Issue with the current version of localstack. I had to use 0.11.2 - see Localstack with MassTransit not getting messages
  2. You are publishing to a different topic. Masstransit will create the topic using the name of the message type. This may not match the topic you configured on the receive endpoint. You can change the topic name by configuring the topology - see How can I configure the topic name when using MassTransit SQS?
  3. Your consumer is not configured on the receive endpoint - see the example below
public static readonly AmazonSQSConfig AmazonSQSConfig = new AmazonSQSConfig { ServiceURL = "http://localhost:4566" };
public static AmazonSimpleNotificationServiceConfig AmazonSnsConfig = new AmazonSimpleNotificationServiceConfig {ServiceURL = "http://localhost:4566"};

...
services.AddMassTransit(x =>
{
    x.UsingAmazonSqs((context, cfg) =>
    {
        cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
        {
            h.Config(AmazonSQSConfig);
            h.Config(AmazonSnsConfig);
        });

        cfg.ReceiveEndpoint(queueName: "deal_queue", e =>
        {
            e.Subscribe("deal-topic", s => {});
            e.Consumer<MessageConsumer>();
        });
    });
});
  
services.AddMassTransitHostedService(waitUntilStarted: true);
services.AddHostedService<Worker>();

From what I see in the docs about Consumers you should be able to add your consumer to the AddMastTransit configuration like your original sample, but it didn't work for me.

0
votes

Whenever Publish is called in MassTransit, messages are published to SNS. Those messages are then routed to receive endpoints as configured. There is no need to understand SQS or SNS when using MassTransit with Amazon SQS/SNS.

In MassTransit, you create consumers, those consumers consume message types, and MassTransit configures topics/queues as needed. Any of the samples using RabbitMQ, Azure Service Bus, etc. are easily converted to SQS by changing UsingRabbitMq to UsingAmazonSqs (and adding the appropriate NuGet package).