I have a problem with InMemoryTestHarness and state machine saga. It looks like each published message are consumed twice by saga. If message are sent (not published) the problem does not occur. The problem started when I changed CorrelationId to custom field - ProcessId. Below simplified example exposing the problem.
Saga definition:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public State InProgress { get; private set; }
public Event<StartProcess> StartProcess { get; private set; }
public Event<ProcessStageFinished> StageFinished { get; private set; }
public Event<ProcessFinished> ProcessFinished { get; private set; }
public MySaga()
{
InstanceState(x => x.CurrentState);
Event(() => StartProcess, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => StageFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Event(() => ProcessFinished, e => e
.CorrelateBy(i => i.ProcessId, x => x.Message.ProcessId)
.SelectId(x => NewId.NextGuid()));
Initially(
When(StartProcess)
.Then(x => x.Instance.ProcessId = x.Data.ProcessId)
.TransitionTo(InProgress)
);
During(InProgress,
When(StageFinished)
.Then(x => x.Instance.Stage++)
);
During(InProgress,
When(ProcessFinished)
.Finalize()
);
}
}
public class MySagaState : SagaStateMachineInstance, ISagaVersion
{
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public string ProcessId { get; set; }
public string CurrentState { get; set; }
public int Stage { get; set; }
}
public record StartProcess(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessStageFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
public record ProcessFinished(Guid CorrelationId, string ProcessId) : CorrelatedBy<Guid>
{
}
And the xUnit test:
public class MySagaTests
{
InMemoryTestHarness Harness { get; }
IStateMachineSagaTestHarness<MySagaState, MySaga> SagaHarness { get; }
public MySagaTests()
{
var services = new ServiceCollection()
.AddMassTransitInMemoryTestHarness(config =>
{
config.AddSagaStateMachine<MySaga, MySagaState>(sagaConfig =>
{
sagaConfig.UseConcurrentMessageLimit(1);
sagaConfig.UseInMemoryOutbox();
})
.InMemoryRepository();
config.AddSagaStateMachineTestHarness<MySaga, MySagaState>();
});
var serviceProvider = services.BuildServiceProvider(true);
Harness = serviceProvider.GetRequiredService<InMemoryTestHarness>();
Harness.OnConfigureInMemoryReceiveEndpoint += config =>
{
config.ConfigureSagas(serviceProvider.GetRequiredService<IBusRegistrationContext>());
};
SagaHarness = serviceProvider.GetRequiredService<IStateMachineSagaTestHarness<MySagaState, MySaga>>();
}
[Fact]
public async Task TestMySaga()
{
string processId = "newProcessId";
var correlationId = NewId.NextGuid();
await Harness.Start();
try
{
await Harness.Bus.Publish(new StartProcess(correlationId, processId));
Assert.True(await Harness.Published.Any<StartProcess>());
Assert.True(await Harness.Consumed.Any<StartProcess>());
Assert.True(await SagaHarness.Consumed.Any<StartProcess>());
Assert.Equal(1, SagaHarness.Sagas.Count()); // HERE should be only one saga created
Assert.True(await SagaHarness.Created.Any(s => s.ProcessId == processId && s.CurrentState == "InProgress"));
await Harness.Bus.Publish(new ProcessStageFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessStageFinished>());
Assert.True(await Harness.Consumed.Any<ProcessStageFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessStageFinished>());
var saga = SagaHarness.Sagas.Select(s => s.ProcessId == processId).FirstOrDefault()?.Saga;
Assert.NotNull(saga);
Assert.Equal(1, saga.Stage); // HERE stage should by 1
await Harness.Bus.Publish(new ProcessFinished(correlationId, processId));
Assert.True(await Harness.Published.Any<ProcessFinished>());
Assert.True(await Harness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Consumed.Any<ProcessFinished>());
Assert.True(await SagaHarness.Sagas.Any(s => s.ProcessId == processId && s.CurrentState == "Final"));
}
finally
{
await Harness.Stop();
}
}
}
I have tried it on simple in-memory bus configuration and on RabbitMQ. On both configuration it works fine. Messages are consumed twice only within InMemoryTestHarness.
Do you have any suggestions what should be fixed? At first glance it looks like a wrong behavior.