2
votes

I am learning more about Mass Transit in my free time. Please see the code below:

var serviceAddress = new Uri("rabbitmq://localhost/check-order-status");
var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
var response = await client.GetResponse<OrderStatusResult>(new { OrderId = id});

which I took from here: http://masstransit-project.com/MassTransit/usage/request-response.html

GetResponse sends a message to a rabbitmq fan out exchange, which has x consumers. Each consumer needs to send a reply to the publisher, however the code above will only receive one reply and then carry on. How can it receive x replies?

I have managed to do this with RabbitMQ (without Mass Transit) like this: https://dotnetcodr.com/2014/05/12/messaging-with-rabbitmq-and-net-c-part-5-headers-and-scattergather/. However, I cannot work out how to do it with mass transit.

Can an async method await multiple replies?

Update

Using a handler appears to work:

 sbc.ReceiveEndpoint(host, myMessage.Id.ToString(), ep =>
                {
                    ep.Handler<MyMessage>(context =>
                    {
                        testlist.Add(context.Message);
                        return Console.Out.WriteLineAsync($"Received: {context.Message.Name}");
                    });
                });

I can now access testlist inside the sender.

1

1 Answers

1
votes

I believe you should be able to achieve similar behaviour than the scatter/gather example by using Task.WhenAll(IEnumerable). There is a section in the Mass Transit request-response page on how to use it on Composing multiple results.

Following the example code from Mass Transit, suppose that you have multiple serviceAddresses stored in a field:

ICollection<Uri> _serviceAddresses;

If you want to get CheckOrderStatus responses from all the clients, you can do it as follows:

var getResponseTasks = new List<Task<Response<CheckOrderStatus>>>();
foreach(var serviceAdress in _serviceAddresses)
{
    var client = bus.CreateRequestClient<CheckOrderStatus>(serviceAddress);
    getResponseTasks.Add(client.GetResponse<OrderStatusResult>(new { OrderId = id}));
}

await Task.WhenAll(getResponseTasks);

Now you'll just have to unwrap the results:

var responses = new List<Response<CheckOrderStatus>>();
foreach(var responseTask in getResponseTasks)
{
    var response = await responseTask;
    responses.Add(response);
}