0
votes

I have an event sourced system that I am now implementing an endpoint for that will rebuild the read side Data Stores from the event store events. However, I am now running into what seems to be concurrency problems with how I am processing the events.

I decided to use my event handler code to process the events during the rebuild. In the system's normal state (not a read side db rebuild), my event handlers listen for events they are subscribed to and update the projections accordingly. However, when processing these events through their event handlers in line, I am seeing inconsistent results in the read side DB final state (if it even gets there, which it sometimes doesn't). I guess this means they are executing out of order.

Should I not be using event handlers in this way? I figured since I am processing events, that reusing the event handlers in this way would be quite appropriate.


I am using MediatR for in service messaging. All event handlers implement INotificationHandler.

Here is a sample of the code:

IEnumerable<IEvent> events = await _eventRepo.GetAllAggregateEvents(aggId);
int eventNumber = 0;
foreach (var e in events)
{
    if (e.Version != eventNumber + 1)
        throw new EventsOutOfOrderException("Events were out of order while rebuilding DB");

    var ev = e as Event;
    // publish different historic events to event handlers which work with read DBs

    switch (e.Type)
    {
        case EventType.WarehouseCreated:
            WarehouseCreated w = new WarehouseCreated(ev);
            await _mediator.Publish(w);
            break;
        case EventType.BoxCreated:
            BoxCreated b = new BoxCreated(ev);
            await _mediator.Publish(b);
            break;
        case EventType.BoxLocationChanged:
            BoxLocationChanged l = new BoxLocationChanged(ev);
            await _mediator.Publish(l);
            break;
    }
    eventNumber++;    
}

I have already tried replacing the await keyword with a call to Wait() instead.

Something like _mediator.Publish(bcc).Wait(). But this doesn't seem like a great idea as there is async code behind this. Also it didn't work..

I have also tried queuing the event versions and having event type cases wait until their version is at the top of the queue before publishing the event.

Something like:

    case EventType.BoxContentsChanged:
        BoxContentsChanged bcc = new BoxContentsChanged(ev);
        while (eventQueue.Peek() != bcc.Version)
            continue;
        await _mediator.Publish(bcc);
        eventQueue.Dequeue();
        break;

This also didn't work.


Anyway - if anyone has any ideas on how to deal with this problem, I would be very appreciative. I would prefer not to duplicate all the async event handler code in a synchronous way.

1
The theory of reprocessing all events to rebuild your read model is sound but reality will mess up your plans. You would move a box from location 1 to location 2,3,4,5,6 before completing when all you care about is location 6. What happens when you have 100’s of thousands of events? The read model is just a projection of the data model designed to make collecting data for the front end easier and faster, so it would be better to write a dedicated task that rebuilds the projection when necessary directly from the data model. - Brad Irby
As for order of processing events, that’s a MediatR specific question and I’m no expert. You may be better off in a MediatR specific forum. - Brad Irby
I don't necessarily understand why 100's of thousands of events are an issue here? I am just rebuilding my read side in line. I suppose it could stall execution of my system for awhile. But I would rather that be the case than have other commands come in and update the read model somewhere in between the rebuild, thus keeping the read side in an inconsistent state. @BradIrby - Lamar

1 Answers

0
votes

I guess the best way to do this is synchronously, to ensure consistency. This required me to duplicate some of my event handler logic in a Replay service and create synchronous repository methods. No race conditions now though, which is nice.