0
votes

I'm creating Tasks at a rate of 5 Tasks per second. I can see in RabbitMQ Message rates incoming an average of 5.2/s, I have 240 consumers distributed in 4 Virtual machines (60 per VM), each worker process a Task that last 20 seconds. In theory I'm supposed to handle 100K task without queuing.

I see a large number of Unacked messages. How to get rid of Unacked messages or add a timer to kill them, does that point to be a problem in my worker side?

How can I recover unacknowledged AMQP messages from other channels than my connection's own?

Queues tab

Ready Unacked Total incoming deliver / get ack

21,884 960 22,844 5.0/s 0.40/s 0.40/s

enter image description here Exchange tab: stackoverflow direct D 5.0/s 5.0/s

This is my celeryconfig file.

CELERYD_CHDIR = settings.filepath
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "US/Eastern"
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = "amqp"
CELERY_RESULT_PERSISTENT = True
BROKER_URL = 'amqp://stackoverflow:stackoverflow@rabbitmq:5672'
BROKER_CONNECTION_TIMEOUT = 15
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_RESULT_EXPIRES = 7200
CELERY_IMPORTS = ("cc.modules.stackoverflow")


CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('gold', Exchange('stackoverflow'), routing_key='stackoverflow.gold'),
    Queue('silver', Exchange('stackoverflow'), routing_key='stackoverflow.silver'),
    Queue('bronze', Exchange('stackoverflow'), routing_key='stackoverflow.bronze'),
)
CELERY_DEFAULT_EXCHANGE = "stackoverflow"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "default"
CELERY_TRACK_STARTED = True

CELERY_ROUTES = {
    'process_call'     : {'queue': 'gold', 'routing_key': 'stackoverflow.gold', 'exchange': 'stackoverflow',},
    'process_recording': {'queue': 'silver', 'routing_key': 'stackoverflow.silver', 'exchange': 'stackoverflow',},
    'process_campaign' : {'queue': 'bronze', 'routing_key': 'stackoverflow.bronze', 'exchange': 'stackoverflow',}
}
1

1 Answers

1
votes

Message acknowledgement acts like transaction in SQL like commit you have to acknowledge the message received from RabbitMq. This function help to avoid the message loss in your system.

Navigate to Message acknowledgment title https://www.rabbitmq.com/tutorials/tutorial-two-python.html