0
votes

I seem to be having an issue when using different rabbitmq hosts to publish / consume messages. Originally, the pub / sub happened on one rabbitmq host. A decision was made to create another host (sitting on a different server) and from there on out I have been running into some issues. I have read up on the multibus documentation (https://masstransit-project.com/usage/containers/multibus.html) and tried to implement it so that I can have one bus for producing messages and one bus for consuming messages. Please find the code below:

--- Startup.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbit1");

    x.AddConsumer<SomeConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
           e.ConfigureConsumer<SomeConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransit<ISecondBus>(x =>
{
    var section = configuration.GetSection("Rabbit2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.PrefetchCount = 4;
            e.UseMessageRetry(r => r.Interval(10, 100));

            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();          

--- ISecondBus.cs ---

public interface ISecondBus : IBus
{
}

--- Consumer.cs ---

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly IBus _bus;

   public SomeConsumer(IBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}

The above code works fine and successfully consumes messages from rabbit1. However, when sending a message to Rabbit2, I can see that the queue is being created, but no data is being persisted to the queue.

See second consumer below:

--- Startup.cs ---

services.AddMassTransit(x =>
{
    var section = configuration.GetSection("Rabbitmq2");

    x.AddConsumer<SomeOtherConsumer>();

    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(section.GetValue<string>("Host"), host =>
        {
            host.Username(section.GetValue<string>("User"));
            host.Password(section.GetValue<string>("Password"));
        });

        cfg.ReceiveEndpoint(QUEUE_NAME, e =>
        {
            e.ConfigureConsumer<SomeOtherConsumer>(context);

            EndpointConvention.Map<MessageObjectList>(e.InputAddress);
        });

        cfg.ConfigureEndpoints(context);
    }));
});

services.AddMassTransitHostedService();

--- SomeOtherConsumer.cs ---

public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
     public async Task Consume(ConsumeContext<MessageObjectList> context)
     {
        //Persist data to db
     }
}

The problem here is that the queue for rabbit2 never gets populate (but it does get created) so no messages are being consumed from the second consumer.

Any assistance would be appreciated.

1
Unclear what is _bus in SomeConsumer - Alexey Zimarev
My apologies @AlexeyZimarev I have edited the question. - Pierre van der Westhuizen
You declared and registered the second bus as ISecondBus, why are you injecting it as IBus? You get the first bus injected and your message gets published there. Chris' answer is exactly right. - Alexey Zimarev

1 Answers

1
votes

If you want to send to the second bus from the first bus, you need to use the second bus interface in the consumer on the first bus.

public class SomeConsumer : IConsumer<MessageObjectList>
{
   private readonly ISecondBus _bus;

   public SomeConsumer(ISecondBus bus)
   {
     _bus = bus;
   }

   public async Task Consume(ConsumeContext<MessageObjectList> context)
   {
       var message = context.Message;

       //Aggregate data

       var newList = new MessageObjectList
                    {
                        Message = message
                    };

      var endpoint = await _bus.GetSendEndpoint(new Uri("queue:" + QUEUE_NAME));
      await endpoint.Send(newList);
   }
}