15
votes

I am using Celery with RabbitMQ to process data from API requests. The process goes as follows:

Request > API > RabbitMQ > Celery Worker > Return

Ideally I would spawn more celery workers but I am restricted by memory constraints.

Currently, the bottleneck in my process is fetching and downloading the data from the URLs passed into the worker. Roughy, the process looks like this:

def celery_gets_job(url):
    data = fetches_url(url)       # takes 0.1s to 1.0s (bottleneck)
    result = processes_data(data) # takes 0.1s
    return result

This is unacceptable as the worker is locked up for a while while fetching the URL. I am looking at improving this through threading, but I am unsure what the best practices are.

  • Is there a way to make the celery worker download the incoming data asynchronously while processing the data at the same time in a different thread?

  • Should I have separate workers fetching and processing, with some form of message passing, possibly via RabbitMQ?

2
You can consider using something like multiprocessing pipes within celery task by creating two multiprocesses. Ofcourse your multiprocessing processes should be restriced by pool. Sharing fetched url's large data over rabbitmq/result backend would not be good idea if I am not wrong. Celery low level api's can also have some similar kind of functionality.Sanket Sudake
I am not aware of RabbitMQ but what I think is multiprocessing will be more suitable for you than multithreading as celery_gets_job has multiple non-atomic operations and this will create problems while using multithreading. You can use Queue where data is populated by pool of processes running fetches_url(url) and another process(es) to carry out processes_data(data)shrishinde
This may be what you are looking for: stackoverflow.com/questions/28315657/…fpbhb
This post news.ycombinator.com/item?id=11889549 by the creator of Celery may be what you are looking for.dyeray

2 Answers

3
votes

Using the eventlet library, you can patch the standard libraries for making them asynchronous.

First import the async urllib2:

from eventlet.green import urllib2

So you will get the url body with:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body

See more eventlet examples here.

0
votes

I would create two tasks, one for downloading the data and the other for processing it once it is downloaded. This way you could scale the two tasks independently. See: Routing, Chains.