Here is the setup - django project with celery and a CloudAMQP rabbitMQ worker doing the message brokering.
My Celery/RabbitMQ settings:
# RabbitMQ & Celery settings
BROKER_URL = 'ampq://guest:guest@localhost:5672/' # Understandably fake
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 30
BROKER_HEARTBEAT = 30
CELERY_SEND_EVENTS = False
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
A docker container running celery with the following command:
bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3'
The shared_task definition:
from __future__ import absolute_import
from celery import shared_task
@shared_task
def push_notification(user_id, message):
logging.critical('Push notifications sent')
return {'status': 'success'}
And me actually calling it when something happens (I have omitted some of the code because it does not seem to be relevant):
from notificatons.tasks import push_notification
def like_this(self, **args):
# Do like stuff and then do .delay()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
So when this is ran - everything seems fine and dandy - the output looks like so:
worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516]
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'}
So from what I gather the task has been ran and executed properly, the messages should be stopped and RabbitMQ should stop.
But in my RabbitMQ Management I see messages getting published and delivered non-stop:
So what I'm gathering from this is that RabbitMQ is trying to send some sort of confirmation and failing and retrying? Is there a way to actually turn this behavior off?
All help and advice is warmly welcomed.
EDIT: Forgot to mentions something important - until I call on push_notification.delay() the message tab is empty save for the heartbeat that comes and goes every 30 seconds. Only after I have called .delay() does this happen.
EDIT 2: CELERYBEAT_SCHEDULE settings (I've tried running with and without them - there was no difference but adding them just in case)
CELERYBEAT_SCHEDULE = {
"minutely_process_all_notifications": {
'task': 'transmissions.tasks.process_all_notifications',
'schedule': crontab(minute='*')
}
}
EDIT 3: Added View code. Also I'm not using the CELERYBEAT_SCHEDULE. I'm just keeping the config in the code for future scheduled tasks
from notifications.tasks import push_notification
class MediaLikesView(BaseView):
def post(self, request, media_id):
media = self.get_object(media_id)
data = {}
data['media'] = media.id
data['user'] = request.user.id
serializer = MediaLikeSerializer(data=data)
if serializer.is_valid():
like = serializer.save()
push_notification.delay(media.user.id, request.user.username + ' has liked your item')
serializer = MediaGetLikeSerializer(like)
return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT)
return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

like_this? and the value you have forCELERYBEAT_SCHEDULEif any? - trinchetprocess_all_notifications? I mean, it should be attransmissions.tasksbut, i don't see this code, could you please added it as well? - trinchet