17
votes

How to work with Django models inside Airflow tasks?

According to official Airflow documentation, Airflow provides hooks for interaction with databases (like MySqlHook / PostgresHook / etc) that can be later used in Operators for row query execution. Attaching the core code fragments:

Copy from https://airflow.apache.org/_modules/mysql_hook.html

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn

Copy from https://airflow.apache.org/_modules/mysql_operator.html

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit
        self.parameters = parameters

    def execute(self, context):
        logging.info('Executing: ' + str(self.sql))
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
        hook.run(
            self.sql,
            autocommit=self.autocommit,
            parameters=self.parameters)

As we can see Hook incapsulates the connection configuration while Operator provides ability to execute custom queries.

The problem:

It's very convenient to use different ORM for fetching and processing database objects instead of raw SQL for the following reasons:

  1. In straightforward cases, ORM can be a much more convenient solution, see ORM definitions.
  2. Assume that there is already established systems like Django with defined models and their methods. Every time these models's schemas changes, airflow raw SQL queries needs to be rewritten. ORM provides a unified interface for working with such models.

For some reason, there are no examples of working with ORM in Airflow tasks in terms of hooks and operators. According to Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles. It's like calling BashOperator with "python work_with_django_models.py" command.

Finally, we want this:

So what are the best practisies in this case? Do we share any hooks / operators for Django ORM / other ORMs? In order to have the following code real (treat as pseudo-code!):

import os
import django
os.environ.setdefault(
    "DJANGO_SETTINGS_MODULE",
    "myapp.settings"
)
django.setup()
from your_app import models

def get_and_modify_models(ds, **kwargs):
    all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
    all_objects[15].my_int_field = 25
    all_objects[15].save()
    return list(all_objects)

django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')

instead of implementing this functionality in raw SQL.

I think it's pretty important topic, as the whole banch of ORM-based frameworks and processes are not able to dive into Airflow in this case.

Thanks in advance!

1
I can see the convenience of accessing your Django models inside of Airflow. But I would argue for keeping that logic within Django if possible, then exposing that logic through perhaps a management command called via a BashOperator. It’ll make it easier to test that code independently and run it outside of Airflow if needed. But depending on your use case I could see that getting unnecessarily hairy. Also a downside would be exception handling isn’t as straightforward.Daniel Huang

1 Answers

21
votes

I agree we should continue to have this discussion as having access Django ORM can significantly reduce complexity of solutions.

My approach has been to 1) create a DjangoOperator

import os, sys

from airflow.models import BaseOperator


def setup_django_for_airflow():
    # Add Django project root to path
    sys.path.append('./project_root/')

    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

    import django
    django.setup()


class DjangoOperator(BaseOperator):

    def pre_execute(self, *args, **kwargs):
        setup_django_for_airflow()

and 2) Extend that DjangoOperator for logic / operators what would benefit from having access to ORM

from .base import DjangoOperator


class DjangoExampleOperator(DjangoOperator):

    def execute(self, context):
        from myApp.models import model
        model.objects.get_or_create()

With this strategy, you can then distinguish between operators that use Raw SQL / ORM. Also note, that for the Django operator, all django model imports need to be within the execution context, demonstrated above.