0
votes

I have been using Masstransit for my project as a consumer. The MassTransit service will consume messages from another service, the other one I have build currently running on NestJS and using RabbitMq to send the messages. The problem when I try to serialize Rabbitmq message which contains basic Properties like this:

properties

But as the MassTransit will then wrap my messages with an interface I can't get values stored on the Properties. It is the "Reply-to" address which used to reply RabbitMq Messages. Is there any way I can get that value? Or how can I respond to the rabbitmq messages applying the "Direct Reply-to" feature?

Update 1 As Chris Patterson response I have tried to done something like this:

Startup.cs

public void ConfigureServices(IServiceCollection services)
{
        services.AddControllersWithViews();
        services.AddMassTransit(x =>
        {
            x.AddConsumer<NewUserConsumer>();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                cfg.UseHealthCheck(provider);
                cfg.Host(new Uri("rabbitmq://localhost"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });
                cfg.ReceiveEndpoint("profile_queue", ep =>
                {
                    ep.PrefetchCount = 16;
                    //ep.UseMessageRetry(r => r.Interval(2, 100));
                    ep.ClearMessageDeserializers();
                    ep.UseRawJsonSerializer();
                    ep.ConfigureConsumer<NewUserConsumer>(provider);
                });
            }));
        });
        services.AddMassTransitHostedService();
}

NewUserConsumer.cs

public class NewUserConsumer : IConsumer<MsgContext>
{
    public async Task Consume(ConsumeContext<MsgContext> context)
    {
        var data = context.Message.data;
        var pattern = context.Message.pattern;
        var id = context.Message.id;

        Console.WriteLine("======>  [Message Received]");
        Console.WriteLine($"data:{data}");
        Console.WriteLine($"pattern:{pattern}");

        await context.RespondAsync<MsgContext>(new
        {
            data = $"Received : {id}",
            pattern = "add-profile"
        });
    }
}

But I got this message:

 fail: MassTransit.ReceiveTransport[0]
  S-FAULT 
  rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 
  07f80000-5d0a-0015-c6d0-08d8b1ee0c86Altsrc.MasstransitSample.EventContracts.MsgContext

I have noticed that the rabbitmq management showed Ready: 0, Unacked: 1, Total: 1 when I got this error but once the terminal close it will then show Ready: 1, Unacked: 0, Total: 1. Somehow my config were not correct or I have missed something ?.

Update 2 I have tried to use ep.UseMessageRetry(r => r.Interval(2,100)); and got this at console terminal:

======> [Message Received] data:post request body pattern:post request fail: MassTransit.ReceiveTransport[0] S-FAULT rabbitmq://localhost/Altsrc.MasstransitSample.EventContracts:MsgContext 07f80000-5d0a-0015-3b01-08d8b2ba5551 Altsrc.MasstransitSample.EventContracts.MsgContext warn: MassTransit.ReceiveTransport[0] R-RETRY rabbitmq://localhost/profile_queue (null) MassTransit.Context.RetryConsumeContext<Altsrc.MasstransitSample.EventContracts.MsgContext> MassTransit.RabbitMqTransport.MessageNotConfirmedException: exchange:Altsrc.MasstransitSample.EventContracts:MsgContext => The message was not confirmed: Value of type 'UInt64' cannot appear as table value at MassTransit.RabbitMqTransport.Integration.BatchPublisher.<>c__DisplayClass8_0.<<Publish>g__PublishAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransport.SendPipe'1.Send(ModelContext modelContext) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at GreenPipes.Agents.PipeContextSupervisor'1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe'1 pipe, CancellationToken cancellationToken) at MassTransit.Initializers.MessageInitializer'2.Send(ISendEndpoint endpoint, InitializeContext'1 context, Object input) at Altsrc.MasstransitSample.EventContracts.NewUserConsumer.Consume(ConsumeContext'1 context) in E:\SampleProjects\Altsrc.MasstransitSample\Altsrc.MasstransitSample.EventContracts\NewUserConsumer.cs:line 23 at MassTransit.Scoping.ScopeConsumerFactory'1.Send[TMessage](ConsumeContext'1 context, IPipe'1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context, IPipe'1 next) at MassTransit.Pipeline.Filters.ConsumerMessageFilter'2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext'1 context, IPipe'1 next) at GreenPipes.Filters.RetryFilter'1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe'1 next)

1
There should be a lot more detail with the S-FAULT exception, which would help understand where the actual issue is occurring.Chris Patterson
Thanks again @ChrisPatterson but that were all I got. I have tried to use ep.UseMessageRetry(r => r.Interval(2, 100)); and received a lot more. Pls see my update.Nguyen An
Some message header is invalid and not supported: Value of type 'UInt64' cannot appear as table value - that's RabbitMQ saying a header value isn't allowed.Chris Patterson

1 Answers

1
votes

MassTransit has built-in support for reply_to and should send the response direct.

You can configure the RawJsonMessageSerializer on the receive endpoint to handle the message.

configurator.ClearMessageDeserializers();
configurator.UseRawJsonSerializer();

This will remove all the standard serializers and assume JSON by default, since you don't have a content_type property set to application/json.