I have been using Masstransit for my project as a consumer. The MassTransit service will consume messages from another service, the other one I have build currently running on NestJS and using RabbitMq to send the messages. The problem when I try to serialize Rabbitmq message which contains basic Properties like this:
But as the MassTransit will then wrap my messages with an interface I can't get values stored on the Properties. It is the "Reply-to" address which used to reply RabbitMq Messages. Is there any way I can get that value? Or how can I respond to the rabbitmq messages applying the "Direct Reply-to" feature?
Update 1
As Chris Patterson response I have tried to done something like this:
Startup.cs
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
services.AddMassTransit(x =>
{
x.AddConsumer<NewUserConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.UseHealthCheck(provider);
cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("profile_queue", ep =>
{
ep.PrefetchCount = 16;
//ep.UseMessageRetry(r => r.Interval(2, 100));
ep.ClearMessageDeserializers();
ep.UseRawJsonSerializer();
ep.ConfigureConsumer<NewUserConsumer>(provider);
});
}));
});
services.AddMassTransitHostedService();
}
NewUserConsumer.cs
public class NewUserConsumer : IConsumer<MsgContext>
{
public async Task Consume(ConsumeContext<MsgContext> context)
{
var data = context.Message.data;
var pattern = context.Message.pattern;
var id = context.Message.id;
Console.WriteLine("======> [Message Received]");
Console.WriteLine($"data:{data}");
Console.WriteLine($"pattern:{pattern}");
await context.RespondAsync<MsgContext>(new
{
data = $"Received : {id}",
pattern = "add-profile"
});
}
}
But I got this message:
fail: MassTransit.ReceiveTransport[0]
S-FAULT
rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext
07f80000-5d0a-0015-c6d0-08d8b1ee0c86Altsrc.MasstransitSample.EventContracts.MsgContext
I have noticed that the rabbitmq management showed Ready: 0, Unacked: 1, Total: 1 when I got this error but once the terminal close it will then show Ready: 1, Unacked: 0, Total: 1. Somehow my config were not correct or I have missed something ?.
Update 2
I have tried to use ep.UseMessageRetry(r => r.Interval(2,100));
and got this at console terminal:
======> [Message Received] data:post request body pattern:post request fail: MassTransit.ReceiveTransport[0] S-FAULT rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 07f80000-5d0a-0015-3b01-08d8b2ba5551 Altsrc.MasstransitSample.EventContracts.MsgContext warn: MassTransit.ReceiveTransport[0] R-RETRY rabbitmq://localhost/profile_queue (null) MassTransit.Context.RetryConsumeContext<Altsrc.MasstransitSample.EventContracts.MsgContext> MassTransit.RabbitMqTransport.MessageNotConfirmedException: exchange:Altsrc.MasstransitSample.EventContracts:MsgContext => The message was not confirmed: Value of type 'UInt64' cannot appear as table value at MassTransit.RabbitMqTransport.Integration.BatchPublisher.<>c__DisplayClass8_0.<<Publish>g__PublishAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at MassTransit.Initializers.MessageInitializer'2.Send(ISendEndpoint endpoint, InitializeContext'1 context, Object input) at Altsrc.MasstransitSample.EventContracts.NewUserConsumer.Consume(ConsumeContext'1 context) in E:\SampleProjects\Altsrc.MasstransitSample\Altsrc.MasstransitSample.EventContracts\NewUserConsumer.cs:line 23 at MassTransit.Scoping.ScopeConsumerFactory'1.Send[TMessage](ConsumeContext'1 context, IPipe'1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context, IPipe'1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context, IPipe'1 next) at GreenPipes.Filters.RetryFilter'1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe'1 next)
ep.UseMessageRetry(r => r.Interval(2, 100));
and received a lot more. Pls see my update. – Nguyen An