3
votes

Here is the setup - django project with celery and a CloudAMQP rabbitMQ worker doing the message brokering.

My Celery/RabbitMQ settings:

# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'

A docker container running celery with the following command:

bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'

The shared_task definition:

from __future__ import absolute_import

from celery import shared_task

@shared_task
def push_notification(user_id, message):
    logging.critical('Push notifications sent')
    return {'status': 'success'}

And me actually calling it when something happens (I have omitted some of the code because it does not seem to be relevant):

from notificatons.tasks import push_notification

    def like_this(self, **args):
    # Do like stuff and then do .delay()
    push_notification.delay(media.user.id, request.user.username + ' has liked your item')

So when this is ran - everything seems fine and dandy - the output looks like so:

worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}

So from what I gather the task has been ran and executed properly, the messages should be stopped and RabbitMQ should stop.

But in my RabbitMQ Management I see messages getting published and delivered non-stop:

RabbitMQ Admin GUI

So what I'm gathering from this is that RabbitMQ is trying to send some sort of confirmation and failing and retrying? Is there a way to actually turn this behavior off?

All help and advice is warmly welcomed.

EDIT: Forgot to mentions something important - until I call on push_notification.delay() the message tab is empty save for the heartbeat that comes and goes every 30 seconds. Only after I have called .delay() does this happen.

EDIT 2: CELERYBEAT_SCHEDULE settings (I've tried running with and without them - there was no difference but adding them just in case)

CELERYBEAT_SCHEDULE = {
    "minutely_process_all_notifications": {
        'task': 'transmissions.tasks.process_all_notifications',
        'schedule': crontab(minute='*')
    }
}

EDIT 3: Added View code. Also I'm not using the CELERYBEAT_SCHEDULE. I'm just keeping the config in the code for future scheduled tasks

from notifications.tasks import push_notification

class MediaLikesView(BaseView):
    def post(self, request, media_id):
        media = self.get_object(media_id)
        data = {}
        data['media'] = media.id
        data['user'] = request.user.id
        serializer = MediaLikeSerializer(data=data)
        if serializer.is_valid():
            like = serializer.save()
            push_notification.delay(media.user.id, request.user.username + ' has liked your item')
            serializer = MediaGetLikeSerializer(like)
            return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
        return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
1
Could you please post the code where you call the method like_this? and the value you have for CELERYBEAT_SCHEDULE if any? - trinchet
Hey. I've added the schedule. The method itself is inside a post method in a Django Rest Framework view class. Not sure if that would be helpful but I can toss it in there too. - Boris Gaganelov
Where is the method process_all_notifications? I mean, it should be at transmissions.tasks but, i don't see this code, could you please added it as well? - trinchet
Please add the code of the view as well - trinchet
There is no such method. It's just placeholder settings for the future if I ever want to add a CELERYBEAT and do crons. - Boris Gaganelov

1 Answers

2
votes

It's Celery's mingle and gossiping. Disable by adding --without-gossip --without-mingle --without-heartbeat to the command line arguments.

Also don't forget to set BROKER_HEARTBEAT = None when you've disabled heartbeats on the commandline, otherwise you'll disconnected after 30s. It's most often better to rely on TCP keepalive then AMQP heartbeats, or even worse, Celery's own heartbeats.