4
votes

I have followed this tutorial in attempt to build an airflow cluster on localhost with my own DAGs. When I ran airflow scheduler after having set executor = CeleryExecutor in the config file, I received the following traceback:

Traceback (most recent call last):

File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in args.func(args)

File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 839, in scheduler job.run()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 200, in run self._execute()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1309, in _execute self._execute_helper(processor_manager)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1441, in _execute_helper self.executor.heartbeat()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 124, in heartbeat self.execute_async(key, command=command, queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 80, in execute_async args=[command], queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 573, in apply_async **dict(self._get_exec_options(), **options)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 354, in send_task reply_to=reply_to or self.oid, **options

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py", line 310, in publish_task **kwargs

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 172, in publish routing_key, mandatory, immediate, exchange, declare)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 449, in _ensured return fun(*args, **kwargs)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 188, in _publish mandatory=mandatory, immediate=immediate,

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py", line 122, in basic_publish mandatory or False, immediate or False,

TypeError: an integer is required (got type NoneType)

Some additional information:

  • I am using Airflow 1.8.0 along with Celery 3.1.25 and RabbitMQ 3.5.7 as a broker and backend, but also tried Airflow 1.9.0 with Celery 4.2.
  • Airflow with sequential executor works without any problems.
  • `airflow test "dag_name" "task_name" "exec_date" runs succeessfully.

I am new to Airflow/Celery/RabbitMQ/SQL, so any help would be appreciated!

2

2 Answers

3
votes

It seems you are using librabbitmq as amqp broker which is not recommended by celery core team. Use py-amqp as the rabbitmq broker and you should get rid of this error.

4
votes

To add to previous answer. Using py-amqp involves either changing from broker_url = amqp://XXXXX to broker_url = pyamqp://XXXXX OR pip uninstall librabbitmq.

Additionally you may need to change celery_result_backend variable to result_backend in your airflow.cfg. The celery_ prefix has been removed for variables in the [celery] node in airflow.cfg in recent versions.