1
votes

It successfully creates tables with airflow initdb but freeze on step:

INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password

The alembic upgrade freeze because I have in my DAG this code

    session = settings.Session()
    conns: Iterable[Connection] = (
       session.query(Connection.conn_id)
       .filter(and_(
         Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
         Connection.conn_type == CONN_TYPE,
       ))
       .all()
    return [conn.conn_id for conn in conns]

I use it to create tasks on the fly based on Airflow Connections with special prefix.

But Airflow runs DAGs code inside initdb command. So my code lock table connection and alembic upgrade script cannot change it and freeze. Deadlock.

As I understand I have to release lock in my DAG code somehow. Reopen transaction? How to do that?

1

1 Answers

0
votes

Ok when I finally understood the problem the solution is pretty simple

    from typing import Iterable
    from airflow import settings
    from airflow.models import Connection
    from sqlalchemy import and_

    CONN_TYPE = 'fs'
    CONN_PREFIX = 'my_special_conn_'

    session = settings.Session()
    try:
        conns: Iterable[Connection] = (
            session.query(Connection.conn_id)
            .filter(and_(
                Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
                Connection.conn_type == CONN_TYPE,
            ))
            .all()
        )
        conn_ids = [conn.conn_id for conn in conns]
    finally:
        session.commit()