3
votes

I'm trying to create a web application that receives an input from POST request and provides some ML predictions based on that input.

Since prediction model is quite heavy, I don't want that user waits for calculation to complete. Instead, I delegated heavy computation to Celery task and user can later inspect the result.

I'm using simple Flask application with Celery, Redis and Flower.

My view.py:

@ns.route('predict/')
class Predict(Resource):
    ...
    def post(self):
        ...
        do_categorize(data)
        return jsonify(success=True)

My tasks.py file looks something like this:

from ai.categorizer import Categorizer
categorizer = Categorizer(
    model_path='category_model.h5',
    tokenizer_path='tokenize.joblib',
    labels_path='labels.joblib'
)


@task()
def do_categorize(data):
    result = categorizer.predict(data)
    print(result)
    # Write result to the DB
    ...

My predict() method inside Categorizer class:

def predict(self, value):
    K.set_session(self.sess)
    with self.sess.as_default():
        with self.graph.as_default():
            prediction = self.model.predict(np.asarray([value], dtype='int64'))
            return prediction

I'm running Celery like this:

celery worker -A app.celery --loglevel=DEBUG

The problem that I'm having for the last couple of days is that categorizer.predict(data) call hangs in the middle of the execution.

I tried to run categorizer.predict(data) inside of post method and it works. But if I place it inside Celery tasks it stop working. There is no console log, if I try to debug it it just freezes on .predict().

My questions:

  • How can I solve this issue?
  • Is there any memory, CPU limit for the worker?
  • Are Celery tasks the "right" way to do such heavy computations?
  • How can I debug this problem? What am I doing wrong?
  • Is it correct to initialize models at the top of the file?
1

1 Answers

3
votes

Thanks to this SO question I found the answer for my problem:

It turns out that is better for Keras to work with Threads pooling instead of default Process.

Lucky for me, with Celery 4.4 Threads pooling was reintroduced not a long time ago. You can read more at Celery 4.4 Changelogs:

Threaded Tasks Pool

We reintroduced a threaded task pool using concurrent.futures.ThreadPoolExecutor.

The previous threaded task pool was experimental. In addition it was based on the threadpool package which is obsolete.

You can use the new threaded task pool by setting worker_pool to ‘threads` or by passing –pool threads to the celery worker command.

Now you can use threads instead of processes for pooling.

celery worker -A your_application --pool threads --loginfo=INFO

If you can not use latest Celery version, you have possibility to use gevent package:

pip install gevent
celery worker -A your_application --pool gevent --loginfo=INFO