0
votes

I got into an issue with IMessageSessionAsyncHandlerFactory where new instances of IMessageSessionAsyncHandler are not created when the volume of writing goes to 0 and then up to a normal level.

To be more precise, I'm using SessionHandlerOptions with a value of 500 for MaxConcurrentSessions. This allows reading at a speed of more than 1k msg/s. The queue I'm reading from is a partitioned queue. The volume of messages in the queue is rather constant, but from time to time it gets down to 0. When the volume gets back to the normal level, the SessionFactory is not spawning any handlers so I'm not able to read messages anymore. It's like the sessions were not correctly recycled or are held into a sort of continuous waiting.

Here is the code for the factory registering:

private void RegisterHandler()
    {
        var sessionHandlerOptions = new SessionHandlerOptions
        {
            AutoRenewTimeout = TimeSpan.FromMinutes(1),
            MessageWaitTimeout = TimeSpan.FromSeconds(1),
            MaxConcurrentSessions = 500
        };
        _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions);
    }

The factory class:

public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
    private readonly Action<BrokeredMessage> _callback;

    public SessionHandlerFactory(Action<BrokeredMessage> callback)
    {
        _callback = callback;
    }

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
    {
        return new SessionHandler(session.SessionId, _callback);
    }

    public void DisposeInstance(IMessageSessionAsyncHandler handler)
    {
        var disposable = handler as IDisposable;

        disposable?.Dispose();
    }
}

And the handler:

public class SessionHandler : MessageSessionAsyncHandler
{
    private readonly Action<BrokeredMessage> _callback;

    public SessionHandler(string sessionId, Action<BrokeredMessage> callback)
    {
        SessionId = sessionId;
        _callback = callback;
    }

    public string SessionId { get; }

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
        try
        {
            _callback(message);
        }
        catch (Exception ex)
        {
            Logger.Error(...);
        }
    }

I can see that the session handlers are closed and that the factories are disposed when the writing/reading is at a normal level. However, once the queue empties, there's no way new session handlers are created. Is there a policy for allocating session IDs that forbids reallocating the same sessions after a period of inactivity?

Edit 1: I'm adding two pictures to illustrate the behavior: enter image description here

When the writer is stopped and restarted, the running reader is not able to read as much as before.

The number of sessions created after that moment is also much lower than before: enter image description here

1

1 Answers

1
votes

The volume of messages in the queue is rather constant, but from time to time it gets down to 0. When the volume gets back to the normal level, the SessionFactory is not spawning any handlers so I'm not able to read messages anymore. It's like the sessions were not correctly recycled or are held into a sort of continuous waiting.

When using IMessageSessionHandlerFactory to control how the IMessageSessionAsyncHandler instances are created, you could try to log the creation and destruction for all of your IMessageSessionAsyncHandler instances.

Based on your code, I created a console application to this issue on my side. Here is my code snippet for initializing queue client and handling messages:

InitializeReceiver

static void InitializeReceiver(string connectionString, string queuePath)
{
    _queueClient = QueueClient.CreateFromConnectionString(connectionString, queuePath, ReceiveMode.PeekLock);

    var sessionHandlerOptions = new SessionHandlerOptions
    {
        AutoRenewTimeout = TimeSpan.FromMinutes(1),
        MessageWaitTimeout = TimeSpan.FromSeconds(5),
        MaxConcurrentSessions = 500
    };
    _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(OnMessageHandler), sessionHandlerOptions);
}

OnMessageHandler

static void OnMessageHandler(BrokeredMessage message)
{
    var body = message.GetBody<Stream>();

    dynamic recipeStep = JsonConvert.DeserializeObject(new StreamReader(body, true).ReadToEnd());
    lock (Console.Out)
    {
        Console.ForegroundColor = ConsoleColor.Cyan;
        Console.WriteLine(
            "Message received:  \n\tSessionId = {0}, \n\tMessageId = {1}, \n\tSequenceNumber = {2}," +
            "\n\tContent: [ title = {3} ]",
            message.SessionId,
            message.MessageId,
            message.SequenceNumber,
            recipeStep.title);
        Console.ResetColor();
    }
    Task.Delay(TimeSpan.FromSeconds(3)).Wait();
    message.Complete();
}

Per my test, the SessionHandler could work as expected when the volume of messages in the queue from normal to zero and from zero to normal for some time as follows:

I also tried to leverage QueueClient.RegisterSessionHandlerAsync to test this issue and it works as well. Additionally, I found this git sample about Service Bus Sessions, you could refer to it.