0
votes

I have the following scripts:

celery_tasks.py

from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'

@app.task(acks_late=True)
def test(a):
   return a

publish.py

from celery_tasks import test
test.delay('abc')

When i run publish.py and start the worker (celery -A celery_tasks worker --loglevel=DEBUG), the 'abc' content is published in the 'test_queue' and is consumed by the worker.

Is there a way for the worker to consume something from a queue that was not posted by Celery? For example, when I put something in the test_queue straight through RabbitMQ, without going through the Celery publisher, and run the Celery worker, it gave me the following warning:

WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

The full contents of the message body was: body: 'abc' (3b)

{content_type:None content_encoding:None delivery_info:{'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue'} headers={}}

Is there a way to solve this?

2
How did you manually publish the message to rabbitmq? From poking around the docs, it looks like content_type and content_encoding are required, and there may be other required fields as well. - miah

2 Answers

1
votes

Celery has a specific format and a set of headers that needs to be maintained to comply with it. Therefore you would have to reverse engineer it to make celery-compliant message not produced by celery. Keep in mind that celery is not really made to send messages across the broker, but to send tasks, which are enhanced messages therefore have extras in the header part of the amqp message

1
votes

It's a late answer but custom consumers might help you. I'm using this for consuming messages from rabbitmq. Where these messages are being populated from another app with pika.

http://docs.celeryproject.org/en/latest/userguide/extending.html#custom-message-consumers