1
votes

I am trying to test a simple Celery app.

config.py 
extract/
       celery.py
       celeryconfig.py
       tasks.py
playlists/
         playlist.json 

config.py

from os import environ

# Configs
REDIS_HOST = "0.0.0.0"
REDIS_PORT = 6379
BROKER_URL = environ.get('REDIS_URL', "redis://{host}:{port}/0".format(
    host=REDIS_HOST, port=str(REDIS_PORT)))
CELERY_RESULT_BACKEND = BROKER_URL

celery.py

from __future__ import absolute_import    
from celery import Celery
from . import celeryconfig

app = Celery('extract')
app.config_from_object(celeryconfig)
app.autodiscover_tasks(['extract'], force=True)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py

tasks are basically an extracting queue, like so:

@app.task(queue='extraction')
def playlist_generator_with_audio_features(username):

    playlist_ids = get_user_playlists(username)

    api = get_api_client()

    (...) # some more code

    export_playlist_with_audio_features.delay(username=username, playlist_id=playlist_id,
                                                  artist=artist, track=track, popularity=popularity,                            
                                                  energy=energy, liveness=liveness, tempo=tempo, 
                                                  speechiness=speechiness, acousticness=acousticness, 
                                                  instrumentalness=instrumentalness, time_signature=time_signature, 
                                                  danceability=danceability, key=key, duration_ms=duration_ms,
                                                  loudness=loudness, valence=valence, mode=mode, 
                                                  uri=uri, preview=preview)
   logger.info('Got audio features from {} track(s) in playlist id:{} by username {}'.format(
                                                                len(tracks_and_features),
                                                                playlist_id,
                                                                username
                                                            ))

    return True

and a writer queue, like so:

@app.task(queue='writer')
def export_playlist_with_audio_features(username, playlist_id, artist, track, popularity, 
                                        energy, liveness,tempo, speechiness, acousticness,
                                        instrumentalness, time_signature, danceability,
                                        key, duration_ms, loudness, valence, mode,
                                        uri, preview):

    path = 'playlists/playlist.json'

    obj = json.dumps({
        'username':username,
        'playlist_id': playlist_id,
        'artist': artist,
        'track': track,
        'popularity': popularity,
        'energy': energy,
        'liveness': liveness,
        'tempo': tempo,
        'speechiness': speechiness,
        'acousticness': acousticness,
        'instrumentalness': instrumentalness,
        'time_signature': time_signature,
        'danceability': danceability,
        'key': key,
        'duration_ms': duration_ms,
        'loudness': loudness,
        'valence': valence,
        'mode': mode,
        'uri': uri,
        'preview': preview
    })

    codecs.open(path, 'a', encoding='utf-8').write(obj + '\n')

    logger.debug('Exported track {} by {} with audio features. Playlist owner: {}'.format(
                                                                    track, artist, username))

    return True

celeryconfig.py

from __future__ import absolute_import, unicode_literals
import os
from celery.schedules import crontab

INSTALLED_APPS = ['app'] # include app
CELERY_TASK_RESULT_EXPIRES = 30
CELERY_TIMEZONE = 'UTC'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_IMPORTS = ('extract.tasks')

DEFAULT_URL = 'redis://localhost:6379/0'

BROKER_URL = os.environ.get('EXTRACT_BROKER_URL', DEFAULT_URL)
CELERY_RESULT_BACKEND = os.environ.get('EXTRACT_RESULT_BACKEND',
                                       DEFAULT_URL)

CELERYBEAT_SCHEDULE = {
    'playlist_generator_with_audio_features': {
        'task': 'extract.tasks.playlist_generator_with_audio_features',
        # Every minute
        'schedule': crontab(minute="*"),
        'args' : [(1),]
    },
    'export_playlist_with_audio_features': {
        'task': 'extract.tasks.export_playlist_with_audio_features',
        # Every minute
        'schedule': crontab(minute="*"),
        'kwargs' : {"username":"username", 
                   "playlist_id": "playlist_id", 
                   "artist": "artist",
                   "track": "track",
                   "popularity": "popularity",
                   "energy":"energy",
                   "liveness": "liveness",
                   "tempo": "tempo",
                   "speechiness": "speechiness",
                   "acousticness":"acousticness",
                   "instrumentalness":"instrumentalness",
                   "time_signature":"time_signature",
                   "danceability":"danceability",
                   "key": "key",
                   "duration_ms": "duration_ms",
                   "loudness": "loudness",
                   "valance": "valance",
                   "mode":"mode",
                   "uri":"uri",
                   "preview": "preview"},
        }
       }

celery workers work on their own, that is, tasks dump json serialized data at playlists/, with the following commands:

$celery worker -A extract -l info -Q extraction

      [2018-12-16 23:31:58,075: WARNING/MainProcess] celery@Vitors-MacBook-Pro-2.local ready.
[2018-12-16 23:32:29,494: INFO/MainProcess] Received task: extract.tasks.playlist_generator_with_audio_features[0713b788-c004-4c0c-b1ee-85068287856a]
[2018-12-16 23:32:30,892: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Discover Weekly, Number of songs: 30, Playlist ID: 37i9dQZEVXcRI2aS94C6hY 
[2018-12-16 23:32:30,893: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Time, Number of songs: 10, Playlist ID: 1qxOGf3xD3fDQs03kwKteg 
[2018-12-16 23:32:30,893: INFO/Worker-4] extract.tasks.get_user_playlists[None]: Playlist name: Vocoder, Number of songs: 280, Playlist ID: 7nROSBznyIkVgjSdNaHDxm 

and

$ celery worker -A extract -l info -Q writer

       [2018-12-16 23:32:14,198: WARNING/MainProcess] celery@Vitors-MacBook-Pro-2.local ready.
[2018-12-16 23:32:32,467: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[a73f776d-2412-4aed-8e84-50c6485ebac8]
[2018-12-16 23:32:32,470: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[930fb552-5687-48ce-b61d-b573a79aa13c]
[2018-12-16 23:32:32,479: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[162a77a1-20ba-48c3-86c3-bb9c51eeed75]
[2018-12-16 23:32:32,486: INFO/MainProcess] Received task: extract.tasks.export_playlist_with_audio_features[89eff015-e692-4af0-a7a8-dcad06dc2e3f]

PROBLEM:

but when I try to use celery beat for the tasks, only the first round tasks are picked by workers, but NOT PERIODICALLY.

celery beat -A app.celery --schedule=/tmp/celerybeat-schedule --loglevel=INFO --pidfile=/tmp/celerybeat.pid

Configuration ->
    . broker -> redis://localhost:6379/0
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> /tmp/celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2018-12-16 23:32:32,706: INFO/MainProcess] beat: Starting...
 [2018-12-16 23:32:32,736: INFO/MainProcess] Scheduler: Sending due task export_playlist_with_audio_features (tasks.export_playlist_with_audio_features)
[2018-12-16 23:32:32,743: INFO/MainProcess] Scheduler: Sending due task playlist_generator_with_audio_features (tasks.playlist_generator_with_audio_features)
[2018-12-16 23:33:00,000: INFO/MainProcess] Scheduler: Sending due task playlist_generator_with_audio_features (tasks.playlist_generator_with_audio_features)

whats am I missing here? thanks

1
did you try starting beat with -A extract instead of app.celery?Legato
yes, it does not work: task is sent but not received by workers. starting with the command used in the question, the first task is sent and received by workers, but then the following periodic taks are only sent but not received by workers.8-Bit Borges
I'm wondering if you need to specify the queue name in the periodic task options. I have a hunch that the beat scheduler is using a different queue. Try running with log level of debug and see if that yields any more useful information.sytech
It's a bad practice to name your moduleы as libraries names. In your case, it's celery.py. At least you will be unable to import celery from another module in that directory, and, also, that can break your app and raise errors.Vladimir Oprya

1 Answers

2
votes
@app.task(queue='extraction')

The options here will not be used by beat. Use the @periodic_task decorator to have beat and the regular celery client use a particular queue.

@periodic_task(run_every=crontab(minute='5'),
           queue='celery_periodic',
           options={'queue': 'celery_periodic'})
  • queue='celery_periodic' option is used when you invoke task from code (.delay or .apply_async)

  • options={'queue': 'celery_periodic'} option is used when celery beat invokes it.

You can also provide beat options in the schedule config like

CELERYBEAT_SCHEDULE = {
   'playlist_generator_with_audio_features': {
        'task': 'extract.tasks.playlist_generator_with_audio_features',
        'schedule': crontab(minute="*"),
        'options': {'queue' : 'celery_periodic'} # specify queue
    },
}