2
votes

I'm using RabbitMQ for the first time and I must be misunderstanding some simple configuration settings. Note that I am encountering this issue while running the app locally right now; I have not yet attempted to launch to production via Heroku.

For this app, every 20 seconds I want to look for some unsent messages in the database, and send them via Twilio. Apologies in advance if I've left some relevant code out of my examples below. I've followed all of the Celery setup/config instructions. Here is my current setup:

BROKER_URL = 'amqp://VflhnMEP:8wGLOrNBP.........Bhshs'  # Truncated URL string

from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'send_queued_messages_every_20_seconds': {
        'task': 'comm.tasks.send_queued_messages',
        'schedule': timedelta(seconds=20),
        # 'schedule': crontab(seconds='*/20')
        },
    }
CELERY_TIMEZONE = 'UTC'

I am pretty sure that the tasks are being racked up in RabbitMQ; here is the dash that I can see with all of the accumulated messages:

enter image description here

The function, 'send_queued_messages' should be called every 20 seconds.

comm/tasks.py

import datetime from celery.decorators import periodic_task

from comm.utils import get_user_mobile_number
from comm.api import get_twilio_connection, send_message
from dispatch.models import Message

@periodic_task
def send_queued_messages(run_every=datetime.timedelta(seconds=20)):
    unsent_messages = Message.objects.filter(sent_success=False)
    connection = get_twilio_connection()

    for message in unsent_messages:
        mobile_number = get_user_mobile_number(message=message)
        try:
            send_message(
                connection=connection,
                mobile_number=mobile_number,
                message=message.raw_text
                )
            message.sent_success=True
            message.save()
        except BaseException as e:
            raise e
            pass

I'm pretty sure that I have something misconfigured with RabbitMQ or in my Heroku project settings, but I'm not sure how to continue troubleshooting. When I run 'celery -A myproject beat' everything appears to be running smoothly.

(venv)josephs-mbp:myproject josephfusaro$ celery -A myproject beat
celery beat v3.1.18 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://VflhnMEP:**@happ...Bhshs
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2015-05-27 03:01:53,810: INFO/MainProcess] beat: Starting...
[2015-05-27 03:02:13,941: INFO/MainProcess] Scheduler: Sending due task send_queued_messages_every_20_seconds (comm.tasks.send_queued_messages)
[2015-05-27 03:02:34,036: INFO/MainProcess] Scheduler: Sending due task send_queued_messages_every_20_seconds (comm.tasks.send_queued_messages)

So why aren't the tasks executing as they do without Celery being involved*?

My Procfile:

web: gunicorn myproject.wsgi --log-file - worker: celery -A myproject beat

*I have confirmed that my code executes as expected without Celery being involved!

1
To which queue tasks are published and from which queue the worker or workers you have are consuming ? The default one ? - Mauro Rocco
Are you running "celery -A myproject worker" at all? beat does not run any tasks by itself. - tuomur
@tuomur I updated my Procfile with 'worker: celery -A myproject beat' and scaled up the worker dyno in Heroku. - Joe Fusaro
@MauroRocco to be honest I'm not 100% sure how to answer your question. When I read the Celery docs, I thought I saw somewhere that RabbitMQ was the default. As mentioned in previous comment, I have a Heroku dyno running as a worker and I guess I misunderstood that RabbitMQ, as the default, knew how to route the accumulated tasks. Maybe there is some RabbitMQ or Celery documentation that I need to study further? - Joe Fusaro
Hi, Celery works on top of AMPQ protocol this means that it's publishing message on an exchange that is bind to a queue or more. By default celery uses a queue and an exchange named "celery". The best way to understand why your task are not being executed is to check this queues on RabbitMQ and see if messages are still stuck there. From rabbitMQ you can also see if there are consumer connected this will help you understand if you have a worker connected but maybe to the wrong queue. Enable this on your rabbitMQ rabbitmq.com/management.html to check all this stuffs - Mauro Rocco

1 Answers

0
votes

Special thanks to @MauroRocco for pushing me in the right direction on this. The pieces that I was missing were best explained in this tutorial: https://www.rabbitmq.com/tutorials/tutorial-one-python.html

Note: I needed to modify some of the code in the tutorial to use URLParameters, passing in the resource URL defined in my settings file.

The only line in send.py and receive.py is:

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))

and of course we need to import the BROKER_URL variable from settings.py

from settings import BROKER_URL

settings.py

BROKER_URL = 'amqp://VflhnMEP:8wGLOrNBP...4.bigwig.lshift.net:10791/sdklsfssd'

send.py

import pika
from settings import BROKER_URL

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

receive.py

import pika
from settings import BROKER_URL

connection = pika.BlockingConnection(pika.URLParameters(BROKER_URL))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()