2
votes

I've been trying to come up with a demo of a website that uses MassTransit with RabbitMQ to post messages to a service running on Service Fabric as a Stateful service.

Everything was going fine, my client would post a message:

IBusControl bus = BusConfigurator.ConfigureBus();
Uri sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}" + $"{RabbitMqConstants.PeopleServiceQueue}");
ISendEndpoint endPoint = await bus.GetSendEndpoint(sendToUri);

await endPoint.Send<ICompanyRequest>(new {CompanyId = id });

My consumer in my service fabric service was defined like:

        IBusControl busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
            {
                h.Username(RabbitMqConstants.UserName);
                h.Password(RabbitMqConstants.Password);
            });

            cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
            {
                e.Consumer<PersonInformationConsumer>();
            });

        });

        busControl.Start();

This does allow me to consume the message in my class and I can process it fine. The problem comes when we want to use IReliableDictonary or IReliableQueue or anything that needs to reference the context that is run from the RunAsync function in the service fabric service.

So my question is, how can I configure (is it possible) MassTransit to work within a Stateful Service Fabric Service which knowledge of the service context itself?

Many thanks in advance. Mike

Update Ok, I've made some progress on this, if I point the register routines to my message consumer class (eg):

ServiceRuntime.RegisterServiceAsync("ServiceType", context => new PersonInformationConsumer(context)).GetAwaiter().GetResult();
                ServiceEventSource.Current.ServiceTypeRegistered(Process.GetCurrentProcess().Id, typeof(PersonInformationConsumer).Name);

Then in my consumer class for my messages I can do the following:

    internal sealed class PersonInformationConsumer : StatefulService, IConsumer<ICompanyRequest>
{
    private static StatefulServiceContext _currentContext;

    #region Constructors

    public PersonInformationConsumer(StatefulServiceContext serviceContext) : base(serviceContext)
    {
        _currentContext = serviceContext;
    }

    public PersonInformationConsumer() : base(_currentContext)
    {
    }

I can now successfully call the service message:

ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

The problem I have now is trying to store something on the IReliableDictionary, doing this causes as "Object reference not set to an instance of an object" error :( ... any ideas would be appreciated (although may not read until new year now!)

        public async Task Consume(ConsumeContext<ICompanyRequest> context)
    {

        ServiceEventSource.Current.ServiceMessage(this.Context, "Message has been consumed, request Id: {0}", context.Message.CompanyId);

        using (ITransaction tx = StateManager.CreateTransaction())
        {

            try
            {

                var myDictionary = await StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");

This is causing the error.... HELP! :)

1
I don't have the service fabric SDK installed, but seeing access to static classes inside an async method gives me pause, surely there is another way to access the reliable dictionary instances.Chris Patterson

1 Answers

1
votes

You'll need to do a bit more to get MassTransit and stateful services working together, there's a few issues to concern yourself here.

Only the master within a stateful partition (n masters within n partitions) will be able to write/update to the stateful service, all replicas will throw exceptions when trying to write back any state. So you'll need to deal with this issue, on the surface it sounds easy until you take in to consideration the master can move around the cluster due to re-balancing the cluster, the default for general service fabric applications is to just turn off the processing on the replicas and only run the work on the master. This is all done by the RunAsync method (try it out, run 3 stateful services with something noddy in the RunAsync method, then terminate the master).

There is also partitioning of your data to consider, due to stateful services scale with partitions, you'll need to create a way to distributing data to separate endpoint on your service bus, maybe have a separate queue that only listens to a given partition range? Say you have a UserCreated message, you might split this on country UK goes to partition 1, US goes to partition 2 etc...

If you just want to get something basic up and running, I'd limit it to one partition and just try putting your bus creation within the the RunAsync and shutdown the bus once a cancelation is requested on the cancelation token.

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        IRabbitMqHost host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri), h =>
        {
            h.Username(RabbitMqConstants.UserName);
            h.Password(RabbitMqConstants.Password);
        });

        cfg.ReceiveEndpoint(host, RabbitMqConstants.PeopleServiceQueue, e =>
        {
            // Pass in the stateful service context
            e.Consumer(c => new PersonInformationConsumer(Context));
        });

    });

    busControl.Start();

    while (true)
    {
        if(cancellationToken.CancellationRequested)
        {
            //Service Fabric wants us to stop
            busControl.Stop();

            cancellationToken.ThrowIfCancellationRequested();
        }

        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}