0
votes

I'm facing a problem where I want to use Rebus to subscribe to and handle messages in rabbitmq. There are multiple message types defined in a third party assembly and new message types will be added to that assembly on a regular basis.

I need to somehow make Rebus subscribe and handle all those message types and forward them (Publish) to another rabbitmq instance. My service is essentially forwarding messages and also adding a custom rebus header when doing so.

The issue is that I do not want to generate handler classes for each and every message type (since the functionality is the same irrespective of the message type). I also dont want to update my code (writing new handler classes) every time a new message type is added in the third party assembly.

I tried using TypeBuilder to dynamically creating message handler classes for every type found by reflection but it feels kind of messy so I hope there is another way?

Code below outlines what I was kind of hoping to achieve even though the code doesn´t compile.

public void SubscribeAndHandleMessages()
        {
            // These types will be determined runtime by using reflection but thats omitted for clarity
            var messageTypes = new List<Type>(){typeof(MessageA), typeof(MessageB)}; 

            var activator = new BuiltinHandlerActivator();

            Configure.With(activator)
                .Transport(t => t.UseRabbitMq(_rabbitConnectionString, "MyQueue"))
                .Start();

            //Subscribe and register handlers
            foreach (var type in messageTypes)
            {
                activator.Bus.Subscribe(type); //This works, I can see the queue subscribing to the correct topics
                activator.Handle<type>(async (bus, context, message) => //This doesnt work since type is not known at compile time
                {
                    //Forwarding to another rabbit instance, same handling for all types of messages
                });
            }
        }
1

1 Answers

0
votes

Once you've established the necessary subscriptions, you only need to be able to handle all kinds of messages that you receive.

The best way to do that with Rebus, is to avoid the ordinary message processing pipeline (deserialize => look up handlers => dispatch) and instead handle the message in its raw form (i.e. in its "transport message" form).

You can do that with Rebus' transport message forwarding capability. With it, a 100% generic message handler could look like this:

Configure.With(activator)
    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "router-tjek"))
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
    {
        var headers = transportMessage.Headers; //< Dictionary<string, string>
        var body = transportMessage.Body;       //< byte[]

        // handle the message here, e.g.
        // by deserializing the body into a JObject,
        // storing the bytes in a database, or by
        // forwarding the message to another queue
        return // appropriate forward action here
    }))
    .Start();

You can read more about it here: Transport message forwarding