0
votes

I'm trying to write unit tests using MassTransit. When looking online, I found the best way to access the Bus would be creating it using an InMemoryTestHarness. I add my consumers and a PublishObserver to get the resulting behavior.

In the example below, I send a TestRequest message to the bus, then my consumer reads the request and puts a TestResponse message back on the bus. Finally, an observer gets the response.

I don't know if the problem is with some configuration I'm missing, or if there is some task I'm not waiting, but the request message never even arrives at the consumer.

What am I missing?

The test

[TestMethod]
public void RequestResponseBusTest()
{
    var harness = new InMemoryTestHarness();

    var consumer = new TestConsumer();
    harness.OnConfigureInMemoryBus += c =>
    {
        c.ReceiveEndpoint("testqueue", e =>
            e.Consumer(() => consumer));
    };

    var observer = new TestPublishObserver();
    harness.OnConnectObservers += c =>
    {
        c.ConnectPublishObserver(observer);
    };

    harness.Start().Wait();
    var bus = harness.Bus;

    bus.Publish(new TestRequest() { X = 99 }).Wait();

    Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
    Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
    Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");

}

And supporting classes

[Serializable]
public class TestRequest
{
    public int X { get; set; }
}

[Serializable]
public class TestResponse
{
    public int Y { get; set; }
}

public class TestConsumer : IConsumer<TestRequest>
{
    public List<TestRequest> ConsumedMessages { get; } = new List<TestRequest>();

    public Task Consume(ConsumeContext<TestRequest> context)
    {
        ConsumedMessages.Add(context.Message);
        context.Publish(new TestResponse() { Y = 123 }).Wait();
        return Task.CompletedTask;
    }
}

private class TestPublishObserver : IPublishObserver
{
    public List<TestRequest> PublishedRequests { get; } = new List<TestRequest>();
    public List<TestResponse> PublishedResponses { get; } = new List<TestResponse>();

    public Task PrePublish<T>(PublishContext<T> context) where T : class
    {
        return Task.CompletedTask;
    }

    public Task PostPublish<T>(PublishContext<T> context) where T : class
    {
        var msg = context.Message;

        if (msg is TestRequest)
            PublishedRequests.Add((TestRequest)(object)msg);

        if (msg is TestResponse)
            PublishedResponses.Add((TestResponse)(object)msg);

        return Task.CompletedTask;
    }

    public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
    {
        return Task.CompletedTask;
    }
}
1
I think looking at how the consumer harness was meant to be used would help here: github.com/MassTransit/MassTransit/blob/develop/src/… - Chris Patterson
I'm aware I'm not using it how it was meant to be used. In reality, what I'm doing is some end-to-end testing on an EventFlow application, where I put a message on the IBusControl and observe the state of the read models.To do this, I have to wait for all the consumers to end before I can start evaluating, and I found no simple way to do this. - Ian Esteves
You can use the activity monitor: stackoverflow.com/a/56044328/1882 - Chris Patterson
I will look into it, thank you. - Ian Esteves

1 Answers

1
votes

you need to add Thread.Sleep(2000) after bus.Publish(new TestRequest() { X = 99 }).Wait(); bus.Publish does not guarantee message delivery. When you call the Wait() method, you simply wait for it to be sent, not processed

OR!!!

[TestMethod]
public void RequestResponseBusTest()
{
    var harness = new InMemoryTestHarness();

    var consumer = new TestConsumer();
    harness.OnConfigureInMemoryBus += c =>
    {
        c.ReceiveEndpoint("testqueue", e =>
            e.Consumer(() => consumer));
    };

    var observer = new TestPublishObserver();
    harness.OnConnectObservers += c =>
    {
        c.ConnectPublishObserver(observer);
    };

    harness.Start().Wait();
    var bus = harness.Bus;

    bus.Publish(new TestRequest() { X = 99 }).Wait();

    //add this line
    var receivedMessage = harness.Consumed.Select<TestRequest>().FirstOrDefault();

    Assert.AreEqual(1, consumer.ConsumedMessages.Count, "consumed");
    Assert.AreEqual(1, observer.PublishedRequests.Count, "requests");
    Assert.AreEqual(1, observer.PublishedResponses.Count, "responses");

}