I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html
Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.
The specification of the setup is detailed below:
Airflow core/server computer
- Python 3.5
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
- RabbitMQ
Configurations made in airflow.cfg:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
Tests performed:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
Airflow worker computer
Has the following installed:
- Python 3.5 with
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
Configurations made in airflow.cfg are exactly the same as in the server:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.2.12:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.12:5672//
celery_result_backend = db+postgresql://username:password@192.168.2.12:5432/airflow
Output from commands run on the worker machine:
When running airflow flower:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:password@192.168.1.12:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:password@192.168.1.12:5672//
I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.
My worker log
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
Now my query is
1) Should I copy the dag folder to the worker computer also
2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.
Please point me where I am making a mistake and how to make the worker process pick up the process.