0
votes

The airflow docs suggest that a basic sanity check for a DAG file is to interpret it. ie:

$ python ~/path/to/my/dag.py

I've found this to be useful. However, now I've created a plugin, MordorOperator under $AIRFLOW_HOME/plugins:

from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators import BaseOperator
from airflow.exceptions import AirflowException
import pika
import json


class MordorOperator(BaseOperator):
    JOB_QUEUE_MAPPING = {"testing": "testing"}

    @apply_defaults
    def __init__(self, job, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # stuff


    def execute(self, context):
        # stuff


class MordorPlugin(AirflowPlugin):
    name = "MordorPlugin"
    operators = [MordorOperator]

I can import the plugin and see it work in a sample DAG:

from airflow import DAG
from airflow.operators import MordorOperator
from datetime import datetime


dag = DAG('mordor_dag', description='DAG with a single task', start_date=datetime.today(), catchup=False)

hello_operator = MordorOperator(job="testing", task_id='run_single_task', dag=dag)

However, when I try to interpret this file I get failures which I suspect I shouldn't get since the plugin successfully runs. My suspicion is that this is because there's some dynamic code gen happening at runtime which isn't available when a DAG is interpreted by itself. I also find that PyCharm can't perform any autocompletion when importing the plugin.

(venv)  3:54PM /Users/paymahn/solvvy/scheduler mordor.operator ✱
 ❮❮❮ python dags/mordor_test.py
section/key [core/airflow-home] not found in config
Traceback (most recent call last):
  File "dags/mordor_test.py", line 2, in 
    from airflow.operators import MordorOperator
ImportError: cannot import name 'MordorOperator'

How can a DAG using a plugin be sanity tested? Is it possible to get PyCharm to give autocompletion for the custom operator?

1
How can you tell the plugin runs successfully? You get the output you expect? Also, which version of Airflow are you running? FWIW if you get a 'cannot import' error in python it usually means there's a syntax error somewhere in your file, it may be due to a dependency it can't find.szeitlin
I know it's successful because I can run the task in airflow and can see the logs I expect. Yup, I get the output I expect. I'm running airflow 1.9.Paymahn Moghadasian

1 Answers

4
votes

I'm running airflow in a docker container and have a script which runs as the containers entry point. Turns out that the plugins folder wasn't available to my container when I was running my tests. I had to add a symlink in the container as part of the setup script. The solution to my problem is highly specific to me and if someone else stumbles upon this I don't have a good answer for your situation other than: make sure your plugins folder is correctly available.