The airflow docs suggest that a basic sanity check for a DAG file is to interpret it. ie:
$ python ~/path/to/my/dag.py
I've found this to be useful. However, now I've created a plugin, MordorOperator under $AIRFLOW_HOME/plugins:
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators import BaseOperator
from airflow.exceptions import AirflowException
import pika
import json
class MordorOperator(BaseOperator):
JOB_QUEUE_MAPPING = {"testing": "testing"}
@apply_defaults
def __init__(self, job, *args, **kwargs):
super().__init__(*args, **kwargs)
# stuff
def execute(self, context):
# stuff
class MordorPlugin(AirflowPlugin):
name = "MordorPlugin"
operators = [MordorOperator]
I can import the plugin and see it work in a sample DAG:
from airflow import DAG
from airflow.operators import MordorOperator
from datetime import datetime
dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)
hello_operator = MordorOperator(job="testing", task_id='run_single_task', dag=dag)
However, when I try to interpret this file I get failures which I suspect I shouldn't get since the plugin successfully runs. My suspicion is that this is because there's some dynamic code gen happening at runtime which isn't available when a DAG is interpreted by itself. I also find that PyCharm can't perform any autocompletion when importing the plugin.
(venv) 3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱
❮❮❮ python dags/mordor_test.py
section/key [core/airflow-home] not found in config
Traceback (most recent call last):
File "dags/mordor_test.py", line 2, in
from airflow.operators import MordorOperator
ImportError: cannot import name 'MordorOperator'
How can a DAG using a plugin be sanity tested? Is it possible to get PyCharm to give autocompletion for the custom operator?