1
votes

The new support for Event Hub Riders in 7.0 plus the existing InMemoryRepository backing for Sagas looks like it could provide a straightforward means of creating aggregate states based on a stream of correlated messages, e.g. across all sensors in a Building). In this scenario, the Building's Identifier would be used as the CorrelationId of the Messages, the Saga, and as the PartitionKey of the EventData messages sent to the Event Hub, ensuring the same consuming service instance receives all messages for that device at a given time. Given the way Event Hub's rebalancing works, it can be assumed that at some point while this service is running, the service instance managing messages for a Partition will shift to a new host, which will start reading messages sent by the sensors in the building. At that moment:

  1. The new host does not know anything about the old host's processing. It just knows that it is now receiving messages for the Event Hub partition that includes that Building's messages.
  2. The devices sending the messages do not know anything about the transition in state aggregation responsibility "downstream of them" - they are still happily reporting new measurements as always.

The challenge this creates is: on the new service instance, we need a new Saga to be created to take over for the previous Saga, but the only thing that knows no Saga lives for a given entity is MassTransit: nothing on the new instance knows a sensor reading from Building A is the first one from Building A since this service instance took over tracking the aggregate Building A state. We thought this could be handled by marking the same Message (DataCollected) with both InitiatedBy and Orchestrates:

public class BuildingAggregator:
            ISaga,
            InitiatedBy<DataCollected>, //init saga on first DataCollected with a given CorrelationId seen
            Orchestrates<DataCollected> //then keep handling those in that saga
{
     //saga Consume methods
}

However, this throws the following exception when the BuildingAggregator receives its second DataCollected message with a given Guid:

Saga exception on receipt of MassTransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
   at MassTransit.Saga.Policies.NewSagaPolicy`2.MassTransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context, IPipe`1 next)
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---

Is there another way of achieving this logic? Is this the "wrong way" to apply Sagas?

1
You could do it using the state machine syntax, I don't know that it is possible with the consumer-based saga approach.Chris Patterson
I see. This is basically because the syntax allows the same event to be used in the Initially() and During() blocks? I think I need to wrap my mind around Automatonymous a bit more to understand how to port this existing application to it (hence the appeal of Consumer Sagas), but worst-case this is probably done with a Custom Activity. Thank you @ChrisPatterson for looking at this! I will update this post if/when I get the pattern working as I wanted.user483679
You're welcome. And yes, include the event in both Initially() and During() blocks and it will handle new/existing instances.Chris Patterson

1 Answers

1
votes

As per Chris Patterson's comments on the question above, this is achievable with the state machine syntax:

Initially(
    When(DataCollected)
        .Then(f => _logger.LogInformation("Initiating Network Manager for Network: {NetworkId}", f.Data.NetworkId))
        .TransitionTo(Running));
During(Running,
    When(DataCollected)
        .Then(f => { // activities and state transitions }),
    When(SimulationComplete)
        .Then(f => _logger.LogInformation("Network {NetworkId} shutting down.", f.Instance.CorrelationId))
        .TransitionTo(Final));

Note how the DataCollected event is handled both in the Initially state transition and in a state transition set by the Initially condition.