The new support for Event Hub Riders in 7.0 plus the existing InMemoryRepository
backing for Sagas looks like it could provide a straightforward means of creating aggregate states based on a stream of correlated messages, e.g. across all sensors in a Building). In this scenario, the Building's Identifier would be used as the CorrelationId of the Messages, the Saga, and as the PartitionKey of the EventData messages sent to the Event Hub, ensuring the same consuming service instance receives all messages for that device at a given time. Given the way Event Hub's rebalancing works, it can be assumed that at some point while this service is running, the service instance managing messages for a Partition will shift to a new host, which will start reading messages sent by the sensors in the building. At that moment:
- The new host does not know anything about the old host's processing. It just knows that it is now receiving messages for the Event Hub partition that includes that Building's messages.
- The devices sending the messages do not know anything about the transition in state aggregation responsibility "downstream of them" - they are still happily reporting new measurements as always.
The challenge this creates is: on the new service instance, we need a new Saga to be created to take over for the previous Saga, but the only thing that knows no Saga lives for a given entity is MassTransit: nothing on the new instance knows a sensor reading from Building A is the first one from Building A since this service instance took over tracking the aggregate Building A state. We thought this could be handled by marking the same Message (DataCollected
) with both InitiatedBy
and Orchestrates
:
public class BuildingAggregator:
ISaga,
InitiatedBy<DataCollected>, //init saga on first DataCollected with a given CorrelationId seen
Orchestrates<DataCollected> //then keep handling those in that saga
{
//saga Consume methods
}
However, this throws the following exception when the BuildingAggregator
receives its second DataCollected
message with a given Guid:
Saga exception on receipt of MassTransitFW_POC.Program+DataCollected: The message cannot be accepted by an existing saga
at MassTransit.Saga.Policies.NewSagaPolicy`2.MassTransit.Saga.ISagaPolicy<TSaga,TMessage>.Existing(SagaConsumeContext`2 context, IPipe`1 next)
at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at MassTransit.Saga.SendSagaPipe`2.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.<Send>d__4`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
Is there another way of achieving this logic? Is this the "wrong way" to apply Sagas?