I'm trying to write unit tests using MassTransit. When looking online, I found the best way to access the Bus would be creating it using an InMemoryTestHarness. I add my consumers and a PublishObserver to get the resulting behavior.
In the example below, I send a TestRequest message to the bus, then my consumer reads the request and puts a TestResponse message back on the bus. Finally, an observer gets the response.
I don't know if the problem is with some configuration I'm missing, or if there is some task I'm not waiting, but the request message never even arrives at the consumer.
What am I missing?
The test
[TestMethod]
public void RequestResponseBusTest()
{
var harness = new InMemoryTestHarness();
var consumer = new TestConsumer();
harness.OnConfigureInMemoryBus += c =>
{
c.ReceiveEndpoint("testqueue", e =>
e.Consumer(() => consumer));
};
var observer = new TestPublishObserver();
harness.OnConnectObservers += c =>
{
c.ConnectPublishObserver(observer);
};
harness.Start().Wait();
var bus = harness.Bus;
bus.Publish(new TestRequest() { X = 99 }).Wait();
Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");
}
And supporting classes
[Serializable]
public class TestRequest
{
public int X { get; set; }
}
[Serializable]
public class TestResponse
{
public int Y { get; set; }
}
public class TestConsumer : IConsumer<TestRequest>
{
public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();
public Task Consume(ConsumeContext<TestRequest> context)
{
ConsumedMessages.Add(context.Message);
context.Publish(new TestResponse() { Y = 123 }).Wait();
return Task.CompletedTask;
}
}
private class TestPublishObserver : IPublishObserver
{
public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
var msg = context.Message;
if (msg is TestRequest)
PublishedRequests.Add((TestRequest)(object)msg);
if (msg is TestResponse)
PublishedResponses.Add((TestResponse)(object)msg);
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
return Task.CompletedTask;
}
}