How to work with Django models inside Airflow tasks?
According to official Airflow documentation, Airflow provides hooks for interaction with databases (like MySqlHook / PostgresHook / etc) that can be later used in Operators for row query execution. Attaching the core code fragments:
Copy from https://airflow.apache.org/_modules/mysql_hook.html
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
Copy from https://airflow.apache.org/_modules/mysql_operator.html
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
def execute(self, context):
logging.info('Executing: ' + str(self.sql))
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
As we can see Hook incapsulates the connection configuration while Operator provides ability to execute custom queries.
The problem:
It's very convenient to use different ORM for fetching and processing database objects instead of raw SQL for the following reasons:
- In straightforward cases, ORM can be a much more convenient solution, see ORM definitions.
- Assume that there is already established systems like Django with defined models and their methods. Every time these models's schemas changes, airflow raw SQL queries needs to be rewritten. ORM provides a unified interface for working with such models.
For some reason, there are no examples of working with ORM in Airflow tasks in terms of hooks and operators. According to Using Django database layer outside of Django? question, it's needed to set up a connection configuration to the database, and then straight-forwardly execute queires in ORM, but doing that outside appropriate hooks / operators breaks Airflow principles. It's like calling BashOperator with "python work_with_django_models.py" command.
Finally, we want this:
So what are the best practisies in this case? Do we share any hooks / operators for Django ORM / other ORMs? In order to have the following code real (treat as pseudo-code!):
import os
import django
os.environ.setdefault(
"DJANGO_SETTINGS_MODULE",
"myapp.settings"
)
django.setup()
from your_app import models
def get_and_modify_models(ds, **kwargs):
all_objects = models.MyModel.objects.filter(my_str_field = 'abc')
all_objects[15].my_int_field = 25
all_objects[15].save()
return list(all_objects)
django_op = DjangoOperator(task_id='get_and_modify_models', owner='airflow')
instead of implementing this functionality in raw SQL.
I think it's pretty important topic, as the whole banch of ORM-based frameworks and processes are not able to dive into Airflow in this case.
Thanks in advance!
BashOperator
. It’ll make it easier to test that code independently and run it outside of Airflow if needed. But depending on your use case I could see that getting unnecessarily hairy. Also a downside would be exception handling isn’t as straightforward. – Daniel Huang