I'm having trouble connecting to the Airflow's metadata database from Python.
The connection is set, and I can query the metadata database, using the UI's Ad hoc query window.
If I try to use the same connection but from Python, it wouldn't work.
So, in details, I have two connection setup, both are working from the UI: Connection 1: Conn type: MYSQL Host: airflow-sqlproxy-service Schema: composer-1-6-1-airflow-1-10-0-57315b5a Login: root
Connection 2: Conn type: MYSQL Host: 127.0.0.1 Schema: composer-1-6-1-airflow-1-10-0-57315b5a Login: root
As I said, both of them working from the UI (Data Profiling -> Ad Hoc Query)
But whenever I create a DAG and try to trigger it from a PythonOperator using various hooks, I'm always getting the same error message: Sample code 1:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.mysql_operator import MySqlOperator
from airflow.hooks.mysql_hook import MySqlHook
class ReturningMySqlOperator(MySqlOperator):
def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
schema=self.database)
return hook.get_records(
self.sql,
parameters=self.parameters)
with DAG(
"a_JSON_db",
start_date=datetime(2020, 11, 19),
max_active_runs=1,
schedule_interval=None,
# catchup=False # enable if you don't want historical dag runs to run
) as dag:
t1 = ReturningMySqlOperator(
task_id='basic_mysql',
mysql_conn_id='airflow_db_local',
#sql="select * from xcom",
sql="select * from xcom")
def get_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='basic_mysql')
string_to_print = 'Value in xcom is: {}'.format(xcom)
# Get data in your logs
logging.info(string_to_print)
t2 = PythonOperator(
task_id='records',
provide_context=True,
python_callable=get_records)
Sample code 2:
def get_dag_ids(**kwargs):
mysql_hook = MySqlOperator(task_id='query_table_mysql',mysql_conn_id="airflow_db",sql="SELECT MAX(execution_date) FROM task_instance WHERE dag_id = 'Test_Dag'")
MySql_Hook = MySqlHook(mysql_conn_id="airflow_db_local")
records = MySql_Hook.get_records(sql="SELECT MAX(execution_date) FROM task_instance")
print(records)
t1 = PythonOperator(
task_id="get_dag_nums",
python_callable=get_dag_ids,
provide_context=True)
The error message is this: ERROR - (2003, "Can't connect to MySQL server on '127.0.0.1' (111)")
I looked up the config, and I found this env_variable: core sql_alchemy_conn mysql+mysqldb://root:@127.0.0.1/composer-1-6-1-airflow-1-10-0-57315b5a env var
I tried to use a postgress connection with this uri as well, same error message (as above).
I'm statred thinking the GCP Airflow's IAP blocking me to have access from a Python DAG.
My Airflow composer version is the following: composer-1.6.1-airflow-1.10.0
Can anyone help me?