3
votes

I'm trying to import a script in many DAGs to call the same operation multiple times. What is the best way to apply this kind of solution?

Right now I have a folder structure as:

dags/
|-- some_dags_folder/
|---- some_dag.py
|-- other_dags_folder/
|---- another_dag.py
|-- utils/
|---- util_slack.py

When I try to import the util_slack file I place into the DAG code the following, for the example suppose the code is from some_dag.py:

from ..utils.util_slack import some_function

After place everything inside Airflow I get the following error:

Broken DAG: [/usr/local/airflow/dags/some_dags_folder/some_dag.py] attempted relative import with no known parent package

The util_slack script is a file made to send either a success message or a fail message and it looks like this

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.hooks.base_hook import BaseHook

CHANNEL = BaseHook.get_connection('Slack').login
TOKEN = BaseHook.get_connection('Slack').password

def slack_success(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)

def slack_fail(context):
    ...
    alterHook = SlackWebhookOperator(...)
    return alterHook.execut(context=context)

The idea is that I can import the util_slack module or any other self-made module into multiple DAGs and invoke the function I need as

...
from ..utils.util_slack import slack_success

...

def task_success(context):
    return slack_success(context)
...
some_task_in_dag = SSHOperator(
    ...
    on_success_callback=task_success
    ...)

Is this the best approach or is it better to create custom plugins like the ones showed at https://airflow.apache.org/plugins.html?

1

1 Answers

3
votes

Not sure if plugins are a good approach in your case. Plugins integrate external features to Airflow core(such as custom endpoints, custom login/auth etc).

Below is my approach. At the moment I have a lot of tasks which work with ClickHouse. So I need to get connection/truncate/insert/copy/etc in different DAG's. Structure example:

 dags
    ├── lib  # you can choose any your favorite name(utils, tools etc)
    │   ├── ... just another common package / module
    │   ├── default.py
    │   ├── configurator.py
    │   └── telegram.py
    └── # dag1, dag2...dag_n

default.py - just default DAG params

from lib.telegram import send_message

def on_success_callback(context):
    pass


def on_failure_callback(context):
    config = get_main_config()
    if not config.get('NOTIFY_ON_FAILURE'):
        return
    send_message('failed blabla')


def get_main_config():
    # I use variable with key 'MAIN_CONFIG' to store some common settings for all dags
    return Variable.get('MAIN_CONFIG', deserialize_json=True)


def get_default_args():
    return {
        'email_on_failure': False,
        'email_on_retry': False,
        'on_failure_callback': on_failure_callback,
        'on_success_callback': on_success_callback,
        # etc...
    }

configurator.py - all necessary initialization in one place. I use inject but you can use any tool / approach This is just an example.

from lib.default import get_main_config
from airflow.hooks.base_hook import BaseHook


class InstancesPool:
    def __init__(self, slack_connection, db_connection):
        self._db_connection = db_connection
        self._slack_connection = slack_connection

    def get_slack_connection(self):
        return self._slack_connection

    def get_db_connection():
        return self._db_connection


class DbConnection:
    # just an example
    def __init__(self, user, password):
        pass


def configure():
    config = get_main_config()

    return InstancesPool(
        BaseHook.get_connection('Slack'),
        DbConnection(config['DB_USER'], config['DB_PASSWORD'])
    )

This way you will not have problems with imports or initialization. You just call:

from lib.configurator import configure


def my_task(ds, **kwargs):
    pool = configure()
    # pool.get_slack_connection() etc...

Hope this helps.