I seem to be having an issue when using different rabbitmq hosts to publish / consume messages. Originally, the pub / sub happened on one rabbitmq host. A decision was made to create another host (sitting on a different server) and from there on out I have been running into some issues. I have read up on the multibus documentation (https://masstransit-project.com/usage/containers/multibus.html) and tried to implement it so that I can have one bus for producing messages and one bus for consuming messages. Please find the code below:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbit1");
x.AddConsumer<SomeConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransit<ISecondBus>(x =>
{
var section = configuration.GetSection("Rabbit2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.PrefetchCount = 4;
e.UseMessageRetry(r => r.Interval(10, 100));
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- ISecondBus.cs ---
public interface ISecondBus : IBus
{
}
--- Consumer.cs ---
public class SomeConsumer : IConsumer<MessageObjectList>
{
private readonly IBus _bus;
public SomeConsumer(IBus bus)
{
_bus = bus;
}
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
var message = context.Message;
//Aggregate data
var newList = new MessageObjectList
{
Message = message
};
var endpoint = await _bus.GetSendEndpoint(new Uri(_configuration.GetConnectionString("Rabbit2") + "/" + QUEUE_NAME));
await endpoint.Send(newList);
}
}
The above code works fine and successfully consumes messages from rabbit1. However, when sending a message to Rabbit2, I can see that the queue is being created, but no data is being persisted to the queue.
See second consumer below:
--- Startup.cs ---
services.AddMassTransit(x =>
{
var section = configuration.GetSection("Rabbitmq2");
x.AddConsumer<SomeOtherConsumer>();
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(section.GetValue<string>("Host"), host =>
{
host.Username(section.GetValue<string>("User"));
host.Password(section.GetValue<string>("Password"));
});
cfg.ReceiveEndpoint(QUEUE_NAME, e =>
{
e.ConfigureConsumer<SomeOtherConsumer>(context);
EndpointConvention.Map<MessageObjectList>(e.InputAddress);
});
cfg.ConfigureEndpoints(context);
}));
});
services.AddMassTransitHostedService();
--- SomeOtherConsumer.cs ---
public class SomeOtherConsumer: IConsumer<MessageObjectList>
{
public async Task Consume(ConsumeContext<MessageObjectList> context)
{
//Persist data to db
}
}
The problem here is that the queue for rabbit2 never gets populate (but it does get created) so no messages are being consumed from the second consumer.
Any assistance would be appreciated.
_businSomeConsumer- Alexey ZimarevISecondBus, why are you injecting it asIBus? You get the first bus injected and your message gets published there. Chris' answer is exactly right. - Alexey Zimarev