0
votes

I'd like to send a message to an appropriate queue based on the routing key. Starting from the producer app, my code (the relevant parts) is:

var options = serviceProvider
    .GetService<IConfiguration>()
    .GetOptions<RabbitMqProducerOptions>("RabbitMqProducer");

foreach (var option in options?.Endpoints)
{
    var method = typeof(EndpointConvention).GetMethod("Map", new[] { typeof(Uri) });
    var type = Assembly.Load(option.Assembly).GetTypes().First(t => t.Name == option.Type);
    var genericMethod = method.MakeGenericMethod(new[] { type });

    genericMethod.Invoke(null, new[] { new Uri($"{options.Address}/{option.Name}") });
}

Bus.Factory.CreateUsingRabbitMq(cfg => {
    cfg.Host(options.Address);
    cfg.Send<CreateProducts>(x => x.UseRoutingKeyFormatter(context => context.Message.Platform));
});

The above is ok - it creates exchanges as I declared them in the configuration file (fanout exchanges, if that matters). Now the consumer configuration:

var options = serviceProvider.GetService<IConfiguration>().GetOptions<RabbitMqConsumerOptions>("RabbitMqConsumer");

Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(options.Address);

    foreach (var kvp in options.Endpoints)
    {
        cfg.ReceiveEndpoint("ingest-products", ep =>
        {
            ep.PrefetchCount = kvp.Value.PrefetchCount;
            ep.BindMessageExchanges = false;

            ep.UseMessageRetry(r => r.Interval(kvp.Value.RetryCount, kvp.Value.RetryInterval));
            ep.Bind("ingest-amazon-products", x => BindForEndpoint(x, kvp.Value.RoutingKey));
            BindExchange(ep, kvp.Value.RoutingKey, kvp.Value.Assembly, kvp.Value.Type);
            ep.ConfigureConsumers(serviceProvider);
        });
    }
});

Now, the code above works, but not in the way I intended, as messages without matching routing keys still get delivered to my consumers. I mean - if the kvp.Value.RoutingKey configuration value is X, and the producer produces a message with a routing key of Y, the consumer listening for X's will get a Y message. How to fix that?

1
Fanout? you should be using direct exchanges if you want to route by RoutingKey to specific endpoint instances. Look at Sample-Direct as an example, which you seem to be close but missing that exchange type specification. - Chris Patterson
@ChrisPatterson - but what code should I use? In the DirectClient you're invoking the ReceiveEndpoint method and setting the type inside, but your client is also a receiver of messages and mine is not. Should I do it anyway? - Marek M.
@ChrisPatterson - tbh, I was mostly using this doc: masstransit-project.com/advanced/topology/… - you don't do anything special on the producer side in the send topology configuration. - Marek M.
@ChrisPatterson - I modified the code so that now the producer is also invoking ReceiveEndpoint and it works. Could you please post an answer, I could accept? :) - Marek M.
Your producer has a receive endpoint? That doesn't seem right, at all. Producers should not be receiving messages, nor configuring queues that other services are using. - Chris Patterson

1 Answers

0
votes

After posting a bounty, I found out that I should use the Publish method of the BusFactoryConfigurator interface.