I have a RabbitMQ server (v.3.8.2) with a simple exchange fanout-queue binding running, with several producers and one consumer. The average delivery/ack rate is quite low, about 6 msg/s.
The queue is created at runtime by producers with the x-message-ttl
parameter set at 900000
(15 minutes).
In very specific conditions (e.g. rare error situation), messages are rejected by the consumer. These messages then are shown in the unacked counter on the RabbitMQ admin web page indefinitely. They never expire or get discarded event after they timeout.
There are no specific per-message overrides in ttl parameters.
I do not need any dead letter processing as these particular messages do not require processing high reliabilty, and I can afford to lose some of them every now and then under those specific error conditions.
Exchange parameters:
name: poll
type: fanout
features: durable=true
bound queue: poll
routing key: poll
Queue parameters:
name: poll
features: x-message-ttl=900000 durable=true
For instance, this is what I am currently seeing in the RabbitMQ server queue admin page:
As you can see, there are 12 rejected/unack'ed messages in the queue, and they have been living there for more than a week now.
How can I have the nacked messages expire as per the ttl parameter? Am I missing some pieces of configuration?
Here is an extract from the consumer source code:
// this code is executed during setup
...
consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
// Retrieve retry count & death list if present
List<object> DeathList = ((e?.BasicProperties?.Headers != null) && e.BasicProperties.Headers.TryGetValue("x-death", out object obj)) ? obj as List<object> : null;
int count = ((DeathList != null) &&
(DeathList.Count > 0) &&
(DeathList[0] is Dictionary<string, object> values) &&
values.TryGetValue("count", out obj)
) ? Convert.ToInt32(obj) : 0;
// call actual event method
switch (OnRequestReceived(e.Body, count, DeathList))
{
default:
channel.BasicAck(e.DeliveryTag, false);
break;
case OnReceivedResult.Reject:
channel.BasicReject(e.DeliveryTag, false);
break;
case OnReceivedResult.Requeue:
channel.BasicReject(e.DeliveryTag, true);
break;
}
};
...
// this is the actual "OnReceived" method
static OnReceivedResult OnRequestReceived(byte[] payload, int count, List<object> DeathList)
{
OnReceivedResult retval = OnReceivedResult.Ack; // success by default
try
{
object request = MessagePackSerializer.Typeless.Deserialize(payload);
if (request is PollRequestContainer prc)
{
Log.Out(
Level.Info,
LogFamilies.PollManager,
log_method,
null,
"RequestPoll message received did={0} type=={1} items#={2}", prc.DeviceId, prc.Type, prc.Items == null ? 0 : prc.Items.Length
);
if (!RequestManager.RequestPoll(prc.DeviceId, prc.Type, prc.Items)) retval = OnReceivedResult.Reject;
}
else if (request is PollUpdateContainer puc)
{
Log.Out(Level.Info, LogFamilies.PollManager, log_method, null, "RequestUpdates message received dids#={0} type=={1}", puc.DeviceIds.Length, puc.Type);
if (!RequestManager.RequestUpdates(puc.DeviceIds, puc.Type)) retval = OnReceivedResult.Reject;
}
else Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, "Message payload deserialization error length={0} count={1}", payload.Length, count);
}
catch (Exception e)
{
Log.Out(Level.Error, LogFamilies.PollManager, log_method, null, e, "Exception dequeueing message. Payload length={0} count={1}", payload.Length, count);
}
// message is rejected only if RequestUpdates() or RequestPoll() return false
// message is always acked if an exception occurs within the try-catch or if a deserialization type check error occurs
return retval;
}
basicReject
on the channel, rejecting it as per the AMQP specifications. Those messages then live indefinitely in the RabbbitMQ server queue under the unacked counter and are never removed. – Alberto Pastore900
is an error in your question. It would be very helpful for you to provide code and instructions to reproduce this scenario. Post them to therabbitmq-users
mailing list and the RabbitMQ team will investigate. Thanks. – Luke Bakken900000
. I'll post to the mailing list as suggested. – Alberto Pastore