3
votes

I'm using RabbitMQ 3.6.0 and Celery 3.1.20 on a Windows 10 machine in a Django application. Everything is running on the same computer. I've configured Celery to Acknowledge Late (CELERY_ACKS_LATE=True) and now I'm getting connection problems.

I start the Celery worker, and after 50-60 seconds of handling tasks each worker thread fails with the following message:

Couldn't ack ###, reason:ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)

(### is the number of the task)

When I look at the RabbitMQ logs I see this:

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672)

=INFO REPORT==== 10-Feb-2016::22:16:16 === accepting AMQP connection <0.254.0> (127.0.0.1:55373 -> 127.0.0.1:5672)

=ERROR REPORT==== 10-Feb-2016::22:17:14 === closing AMQP connection <0.247.0> (127.0.0.1:55372 -> 127.0.0.1:5672): {writer,send_failed,{error,timeout}}

The error occurs exactly when the Celery workers are getting their connection reset.

I thought this was an AMQP Heartbeat issue, so I've added BROKER_HEARTBEAT = 15 to my Celery settings, but it did not make any difference.

2
Did you ever figure this out by chance?Jarad
In my case, I had the port at the end of the broker url. Removing the port resolved it. I think the port is handled automatically by celery and since broker may be running on a number of ports depending on availability, hard coding a port is not ideal.Patrick Mutuku

2 Answers

5
votes

I was having a similar issue with Celery on Windows with long running tasks with concurrency=1. The following configuration finally worked for me:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

I also started the celery worker daemon with the -Ofair option:

celery -A test worker -l info -Ofair

In my limited understanding, CELERYD_PREFETCH_MULTIPLIER sets the number of messages that sit in the queue of a specific Celery worker. By default it is set to 4. If you set it to 1, each worker will only consume one message and complete the task before it consumes another message. I was having issues with long-running task because the connection to RabbitMQ was consistently lost in the middle of the long task, but then the task was re-attempted if any other message/tasks were waiting in the celery queue.

The following option was also specific to my situation:

CELERYD_CONCURRENCY = 1

Setting concurrency to 1 made sense for me because I had long running tasks that needed a large amount of RAM so they each needed to run solo.

1
votes

@bbaker solution with CELERY_ACKS_LATE (which is task_acks_late in celery 4x) itself did not work for me. My workers are in Kubernetes pods and must be run with --pool solo and each task takes 30-60s.

I solved it by including broker_heartbeat=0

broker_pool_limit = None
task_acks_late = True
broker_heartbeat = 0
worker_prefetch_multiplier = 1