0
votes

I am trying to assign a priority level to both of my queues in RabbitMQ so that my workers will always consume and clear out all messages from Queue1 first before consuming from Queue2. I use a celery configuration file, called celeryconfig.py, that looks like this:

import ssl
broker_url="amqps://USR:PWD@URL//"
result_backend="db+postgresql://USR:PWD@BURL?sslmode=verify-full&sslrootcert=/usr/local/share/ca-certificates/MY_CACERT.crt"
include=["my_tasks"]
task_acks_late=True
task_default_rate_limit="150/m"
task_time_limit=300
worker_prefetch_multiplier=1
worker_max_tasks_per_child=2
timezone="UTC"
broker_use_ssl = {'keyfile': '/usr/local/share/private/MY_KEY.key', 'certfile': '/usr/local/share/ca-certificates/MY_CERT.crt', 'ca_certs': '/usr/local/share/ca-certificates/MY_CACERT.crt', 'cert_reqs': ssl.CERT_REQUIRED, 'ssl_version': ssl.PROTOCOL_TLSv1_2}

Currently I only have 1 queue and this is how I am starting the celery workers

celery -A celery_app worker -l info --config celeryconfig --concurrency=16 -n "%h:celery-worker" -O fair

I have read the short doc here https://docs.celeryproject.org/en/v4.3.0/userguide/routing.html#routing-options-rabbitmq-priorities but it only mentions setting the max priority level and does not tell me how to set priority levels for each individual queue in RabbitMQ.

  • RabbitMQ: 3.7.17
  • Celery: 4.3.0
  • Python: 3.6.7
  • OS: Ubuntu 18.04.3 LTS bionic

Can someone shed some light on this? Thank you

1

1 Answers

0
votes

I am not familiar with celery at all, but other systems can run separate workers depending on queue or other filter. And each worker can have each own config for messages per second consumed, concurrency etc..

You can create two celery configs, one with e.g. 10 priority and the other with 5 priority, and run two "instances" of celery.

This will work much better... per message priority in same worker does not work so well.