0
votes

I have a RabbitMQ server (v.3.8.2) with a simple exchange fanout-queue binding running, with several producers and one consumer. The average delivery/ack rate is quite low, about 6 msg/s.

The queue is created at runtime by producers with the x-message-ttl parameter set at 900000 (15 minutes).

In very specific conditions (e.g. rare error situation), messages are rejected by the consumer. These messages then are shown in the unacked counter on the RabbitMQ admin web page indefinitely. They never expire or get discarded event after they timeout.

There are no specific per-message overrides in ttl parameters.

I do not need any dead letter processing as these particular messages do not require processing high reliabilty, and I can afford to lose some of them every now and then under those specific error conditions.

Exchange parameters:

name: poll
type: fanout
features: durable=true
bound queue: poll
routing key: poll

Queue parameters:

name: poll
features: x-message-ttl=900000 durable=true

For instance, this is what I am currently seeing in the RabbitMQ server queue admin page:

"Poll" Queue summary

As you can see, there are 12 rejected/unack'ed messages in the queue, and they have been living there for more than a week now.

How can I have the nacked messages expire as per the ttl parameter? Am I missing some pieces of configuration?

Here is an extract from the consumer source code:

// this code is executed during setup
...
consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
    // Retrieve retry count & death list if present
    List<object> DeathList = ((e?.BasicProperties?.Headers != null) && e.BasicProperties.Headers.TryGetValue("x-death", out object obj)) ? obj as List<object> : null;
    int count = ((DeathList != null) &&
        (DeathList.Count > 0) &&
        (DeathList[0] is Dictionary<string, object> values) &&
        values.TryGetValue("count", out obj)
    ) ? Convert.ToInt32(obj) : 0;

    // call actual event method
    switch (OnRequestReceived(e.Body, count, DeathList))
    {
        default:
            channel.BasicAck(e.DeliveryTag, false);
            break;
        case OnReceivedResult.Reject:
            channel.BasicReject(e.DeliveryTag, false);
            break;
        case OnReceivedResult.Requeue:
            channel.BasicReject(e.DeliveryTag, true);
            break;
    }
};
...

// this is the actual "OnReceived" method

static OnReceivedResult OnRequestReceived(byte[] payload, int count, List<object> DeathList)
{
    OnReceivedResult retval = OnReceivedResult.Ack; // success by default

    try
    {
        object request = MessagePackSerializer.Typeless.Deserialize(payload);
        if (request is PollRequestContainer prc)
        {
            Log.Out(
                Level.Info,
                LogFamilies.PollManager,
                log_method,
                null,
                "RequestPoll message received did={0} type=={1} items#={2}", prc.DeviceId, prc.Type, prc.Items == null ? 0 : prc.Items.Length
            );
            if (!RequestManager.RequestPoll(prc.DeviceId, prc.Type, prc.Items)) retval = OnReceivedResult.Reject;
        }
        else if (request is PollUpdateContainer puc)
        {
            Log.Out(Level.Info, LogFamilies.PollManager, log_method, null, "RequestUpdates message received dids#={0} type=={1}", puc.DeviceIds.Length, puc.Type);
            if (!RequestManager.RequestUpdates(puc.DeviceIds, puc.Type)) retval = OnReceivedResult.Reject;
        }
        else Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, "Message payload deserialization error length={0} count={1}", payload.Length, count);
    }
    catch (Exception e)
    {
        Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, e, "Exception dequeueing message. Payload length={0} count={1}", payload.Length, count);
    }

    // message is rejected only if RequestUpdates() or RequestPoll() return false
    // message is always acked if an exception occurs within the try-catch or if a deserialization type check error occurs
    return retval;
}

2
What do you mean be rejected by consumer , is it like consumer does not ack them ? Also what do you mean that they continue to show up ..Soumen Mukherjee
@SoumenMukherjee the consumer is calling basicReject on the channel, rejecting it as per the AMQP specifications. Those messages then live indefinitely in the RabbbitMQ server queue under the unacked counter and are never removed.Alberto Pastore
You appear to be using two different TTL values (90000 vs 900). I'm assuming 900 is an error in your question. It would be very helpful for you to provide code and instructions to reproduce this scenario. Post them to the rabbitmq-users mailing list and the RabbitMQ team will investigate. Thanks.Luke Bakken
@LukeBakken thanks Luke for pointing out the inconsistence (it was a typo), I updated it to 900000. I'll post to the mailing list as suggested.Alberto Pastore
I can't seem to find the message on the mailing list. If you can put the link here that would be great.Luke Bakken

2 Answers

0
votes

This state occurs when the Consumer does not ack or reject both after receiving the message.
In the unacked state, the message does not expire.
After receiving the message, you must ack or reject it.

This issue isn't a problem that doesn't expire, problem is you don't ack or reject the message.

0
votes

x-message-ttl=900000 means how long the message stays in the queue without being delivered to the consumer.

In your situation, your message is already delivered to the consumer and it needs to be acked/rejected.