6
votes

I've noticed that celery has been sending tasks to multiple queues, and workers on both queues have been executing the tasks.

My queue definitions are:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('client1', Exchange('client1'), routing_key='client1'),
    Queue('images', Exchange('media'), routing_key='media.images'),
)

And when, after stopping all my workers, I run:

>>> tasks.ping.apply_async(queue='default')

I can see the task appear in both the default and client1 queues:

$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 1

This only applies to the default queue. Sending it directly to the client1 queue only adds it there:

>>> tasks.ping.apply_async(queue='client1')
$ redis-cli -c llen default
(integer) 1
$ redis-cli -c llen client1
(integer) 2

The images queue never receives tasks incorrectly.

This is Celery 3.1.15 with the Redis broker.

1

1 Answers

8
votes

Okay! It looks like the problem is that Kombu's Redis broker doesn't clear old exchanges + routing keys.

Initially I had configured the queues:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('client1', Exchange('default'), routing_key='default'),
)

And later changed them to use a separate exchange and routing key for client1.

But for some reason Kombu didn't clear out the old bindings, so I was left with:

redis> smembers _kombu.binding.default
1) "default\x06\x16\x06\x16client1"
2) "default.client1\x06\x16\x06\x16client1"
3) "default\x06\x16\x06\x16default"

So tasks sent to default were being routed to both the default and client1 queues.

The fix was to remove the incorrect bindings:

redis> srem _kombu.binding.default "default\x06\x16\x06\x16client1"
redis> srem _kombu.binding.default "default.client1\x06\x16\x06\x16client1"