13
votes

I've been working with Celery lately and I don't like it. It's configuration is messy, overcomplicated and poorly documented.

I want to send broadcast messages with Celery from a single producer to multiple consumers. What confuses me is discrepancy between Celery terms and terms of underlying transport RabbitMQ.

In RabbitMQ you can have a single fanout Exchange and multiple Queues to broadcast messages:

enter image description here

But in Celery the terms are all messed up: here you can have a broadcast Queue, which sends messages to multiple consumers:

enter image description here

I don't even understand, how Celery broadcast queue is supposed to work at all, cause RabbitMQ queues with multiple consumers are meant for load balancing. So in RabbitMQ if multiple consumers (i.e. a pool of consumers) are connected to the same queue, only one consumer will receive and process message, which is called round robin in RabbitMQ docs.

Also, Celery documentation on broadcast is really insufficient. What type of RabbitMQ exchange should I specify for Broadcast queue, fanout or not? Could you supply a full example?

So, what I'm asking for is (1) clarification of concept and implementation of Broadcast queues in Celery and (2) a complete example of Broadcast queues configuration. Thank you.

3
Does this help? celery.readthedocs.org/en/latest/userguide/… It appears the 'queue' definition in Celery includes the exchange, so possibly you can define a Celery queue on top of a fanout exchange which will have an underlying implementation of multiple RabbitMQ queues. In this case I would guess you don't want a 'broadcast' queue in the Celery config, unless you really want multiple workers processing the same taskAnentropic
@Anentropic Thanks for reply, I've been using that page extensively, but as you can see, the definition of Broadcast queue there is CELERY_QUEUES = (Broadcast('broadcast_tasks'), ) and it doesn't specify the exchange at all, unlike normal CELERY_QUEUES = (Queue(name, exchange, routing_key), ) in examples, you pointed to. I've been looking for Broadcast in API reference, but can't find it.Boris Burkov
what I was saying is: I don't think you want to use a broadcast queue at all. I think you want to define a normal Celery queue on top of Exchange('fanout') exchange typeAnentropic
@Anentropic Well, I came to the same conclusion. :) Thanks, Anentropic, let this Broadcast queue be a mistery of Celery, whatever it is.Boris Burkov
Thanks @BorisBurkov. I found what my issue was, left comment under accepted answer.Greg0ry

3 Answers

2
votes

Does this help?
http://celery.readthedocs.org/en/latest/userguide/routing.html#exchanges-queues-and-routing-keys

It appears the 'queue' definition in Celery includes the exchange, so you can define a Celery queue on top of Exchange('fanout') exchange type, which will have an underlying implementation of multiple RabbitMQ queues.

In this case I would guess you don't want a 'broadcast' queue in the Celery config, unless you really want multiple workers processing the same task.

3
votes

Having looked at the code (it's in the kombu.common package, not celery) and tried it out, it seems to work like this:

  • You define a Broadcast 'queue' named 'foo' in your celery config.
  • This creates an Exchange named 'foo', and an auto_delete queue with a unique id (via uuid), and with the alias 'foo' (I don't think the alias is actually used anywhere, it's just there for reference because the queue's real name is randomly generated)
  • The unique queue is bound to the 'foo' exchange

So, the class is named Broadcast, but it's really a uniquely named queue that is bound to a fanout exchange. Therefore when each worker is started, it creates its own unique queue and binds to the fanout exchange.

1
votes

if you are using Celery 4.0.1+, and broadcast is not working for you, please check https://github.com/celery/celery/pull/3934 and see clokep's solution, it restores the previous version of amqp.py and that works for me.