I'm creating Tasks at a rate of 5 Tasks per second. I can see in RabbitMQ Message rates incoming an average of 5.2/s, I have 240 consumers distributed in 4 Virtual machines (60 per VM), each worker process a Task that last 20 seconds. In theory I'm supposed to handle 100K task without queuing.
I see a large number of Unacked messages. How to get rid of Unacked messages or add a timer to kill them, does that point to be a problem in my worker side?
How can I recover unacknowledged AMQP messages from other channels than my connection's own?
Queues tab
Ready Unacked Total incoming deliver / get ack
21,884 960 22,844 5.0/s 0.40/s 0.40/s
Exchange tab: stackoverflow direct D 5.0/s 5.0/s
This is my celeryconfig file.
CELERYD_CHDIR = settings.filepath
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "US/Eastern"
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_IGNORE_RESULT = True
CELERY_RESULT_BACKEND = "amqp"
CELERY_RESULT_PERSISTENT = True
BROKER_URL = 'amqp://stackoverflow:stackoverflow@rabbitmq:5672'
BROKER_CONNECTION_TIMEOUT = 15
BROKER_CONNECTION_MAX_RETRIES = 5
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_RESULT_EXPIRES = 7200
CELERY_IMPORTS = ("cc.modules.stackoverflow")
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('gold', Exchange('stackoverflow'), routing_key='stackoverflow.gold'),
Queue('silver', Exchange('stackoverflow'), routing_key='stackoverflow.silver'),
Queue('bronze', Exchange('stackoverflow'), routing_key='stackoverflow.bronze'),
)
CELERY_DEFAULT_EXCHANGE = "stackoverflow"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "default"
CELERY_TRACK_STARTED = True
CELERY_ROUTES = {
'process_call' : {'queue': 'gold', 'routing_key': 'stackoverflow.gold', 'exchange': 'stackoverflow',},
'process_recording': {'queue': 'silver', 'routing_key': 'stackoverflow.silver', 'exchange': 'stackoverflow',},
'process_campaign' : {'queue': 'bronze', 'routing_key': 'stackoverflow.bronze', 'exchange': 'stackoverflow',}
}