1
votes

I'm having trouble connecting to the Airflow's metadata database from Python.

The connection is set, and I can query the metadata database, using the UI's Ad hoc query window.

If I try to use the same connection but from Python, it wouldn't work.

So, in details, I have two connection setup, both are working from the UI: Connection 1: Conn type: MYSQL Host: airflow-sqlproxy-service Schema: composer-1-6-1-airflow-1-10-0-57315b5a Login: root

Connection 2: Conn type: MYSQL Host: 127.0.0.1 Schema: composer-1-6-1-airflow-1-10-0-57315b5a Login: root

As I said, both of them working from the UI (Data Profiling -> Ad Hoc Query)

But whenever I create a DAG and try to trigger it from a PythonOperator using various hooks, I'm always getting the same error message: Sample code 1:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook

class ReturningMySqlOperator(MySqlOperator):
    def execute(self, context):
        self.log.info('Executing: %s', self.sql)
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                         schema=self.database)
        return hook.get_records(
            self.sql,
            parameters=self.parameters)

with DAG(
    "a_JSON_db",
    start_date=datetime(2020, 11, 19),
    max_active_runs=1,
    schedule_interval=None,
    # catchup=False # enable if you don't want historical dag runs to run
) as dag:

    t1 = ReturningMySqlOperator(
        task_id='basic_mysql',
        mysql_conn_id='airflow_db_local',
        #sql="select * from xcom",
        sql="select * from xcom")
    
    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='basic_mysql')
        string_to_print = 'Value in xcom is: {}'.format(xcom)
        # Get data in your logs
        logging.info(string_to_print)
    
    t2 = PythonOperator(
        task_id='records',
        provide_context=True,
        python_callable=get_records)

Sample code 2:

def get_dag_ids(**kwargs):
    mysql_hook = MySqlOperator(task_id='query_table_mysql',mysql_conn_id="airflow_db",sql="SELECT MAX(execution_date) FROM task_instance WHERE dag_id = 'Test_Dag'")
    MySql_Hook = MySqlHook(mysql_conn_id="airflow_db_local")
    records = MySql_Hook.get_records(sql="SELECT MAX(execution_date) FROM task_instance")
    print(records)
t1 = PythonOperator(
    task_id="get_dag_nums",
    python_callable=get_dag_ids,
    provide_context=True)

The error message is this: ERROR - (2003, "Can't connect to MySQL server on '127.0.0.1' (111)")

I looked up the config, and I found this env_variable: core sql_alchemy_conn mysql+mysqldb://root:@127.0.0.1/composer-1-6-1-airflow-1-10-0-57315b5a env var

I tried to use a postgress connection with this uri as well, same error message (as above).

I'm statred thinking the GCP Airflow's IAP blocking me to have access from a Python DAG.

My Airflow composer version is the following: composer-1.6.1-airflow-1.10.0

Can anyone help me?

1

1 Answers

0
votes

Only the service account associated with Composer will have read access to the tenant project for the metadata database. This will incorporate connecting from the underlying Kubernetes (Airflow) system to the tenant project hosting Airflow's Cloud SQL instance.

The accepted connection methods are SQLAlchemy and using the Kubernetes cluster as a proxy. Connections from GKE will use the airflow-sqlproxy-service.default service discovery name for connecting.

We also you have a CLI option through GKE. Use the following command to run a temporary deployment+pod using the mysql:latest image, which comes preinstalled with the mysql CLI tool:

$ kubectl run mysql-cli-tmp-deployment  --generator=run-pod/v1  --rm --stdin --tty  --image mysql:latest  --  bash

Once in the shell, we can use the mysql tool to open an interactive session with the Airflow database:

 $ mysql  --user root  --host airflow-sqlproxy-service  --database airflow-db

Once the session is open, standard queries can be executed against the database.