2
votes

I'm using Rebus and I want to introduce something like described in CQRS Journey at paragraph "Avoid processing events multiple times", but I cannot figure it out.

I configured Rebus to use SQL Server for Transport and MongoDB for Subscriptions and Sagas. The Routing is configured TypeBased and maps all the commands handlers' types to the queue configured in Transport.

 var bus = Configure.With(new SimpleInjectorContainerAdapter(container))
            .Logging(l => l.Trace())
            .Transport(t =>
            {
                t.UseSqlServer(connectionstring, "TestMessages", "messageQueueName");
            })
            .Routing(r => r.TypeBased()
                            .MapAssemblyOf<Assembly1.Commands.DoSomething>("messageQueueName")
                            .MapAssemblyOf<Assembly2.Commands.DoSomethingElse>("messageQueueName")
                             )
            .Sagas(s => s.StoreInMongoDb(db, (sagaType) =>
            {
                return sagaType.Name;
            }))
            .Subscriptions(s => s.StoreInMongoDb(db, "Subscriptions"))
            .Options(o =>
            {
                o.SetNumberOfWorkers(1);
                o.SetMaxParallelism(1);
                o.EnableSagaAuditing().StoreInMongoDb(db, "Snapshots");
            })
            .Start();

Now I should configure Rebus in a way that when a command Publish an event, this is replicated in as many separate topics (virtual or physical queues) as existing subscribers' types.

Something like:

bus.Subscribe<Assembly1.EventHandler1>("Assembly1.EventHandler1Queue").Wait();
bus.Subscribe<Assembly1.EventHandler2>("Assembly1.EventHandler2Queue").Wait();
bus.Subscribe<Assembly2.EventHandler1>("Assembly2.EventHandler1Queue").Wait();

Thanks for help.

1
you tagged the question with azure-servicebus-topics and you are referencing an article on how to use (among other things) Azure Service Bus topics to implement pub/sub.... but you have configured your bus to use SQL Server as its message queue - is that intentional?mookid8000
I just added that tag because I cannot insert "topic/topics" as a new tagilcorvo

1 Answers

2
votes

There's a few things that seem confusing with your question.

But I guess your fundamental question is how to be sure that each message is processed one time only by each subscriber.

The answer is pretty easy: Have a separate endpoint for each subscriber - this means that each subscriber will have its own input queue which messages get processed from, and which a failed message will be returned to.

You can then have as many or as few handlers in each subscriber as you want. All compatible handlers will be executed for each incoming message.

With Rebus, each invocation to Configure.With(...).(...).Start() will give you a separate endpoint - so in your case, I suggest you wrap the subscriber endpoint creation in a method, which you can then invoke like this:

var event1Subscriber = CreateSubscriber("subscriber_event1");
event1Subscriber.Subscribe<Event1>().Wait();

var event2Subscriber = CreateSubscriber("subscriber_event2");
event2Subscriber.Subscribe<Event2>().Wait();

var event3Subscriber = CreateSubscriber("subscriber_event3");    
event3Subscriber.Subscribe<Event3>().Wait();

// ...

where CreateSubscriber would then be something like this:

public IBus CreateSubscriber(string queueName)
{
    return Configure.With(GetContainerAdapter())
        .Transport(t => t.UseMsmq(queueName))
        .Start();        
}