0
votes

I need help in passing parameters (xcom pushed from previous task), to a SQL query in a .sql file. However, I am unable to do so using the "parameters" option, even though this option is able to render xcom values from previous task. Let me know what wrong am I doing.

Thanks :)

start = EmptyOperator(
            task_id="start",
    )

fetch_cust_id = PythonOperator(
    task_id = "fetch",
    python_callable = lambda: 'C001',
)

update_orders = MySqlOperator(
    task_id="update",
    mysql_conn_id="mysql_default",
    database="my_db",
    sql="/update.sql",
    parameters={
        "custid": "{{ ti.xcom_pull(task_ids='fetch') }}"
    }
)

start >> fetch_cust_id >> update_orders

SQL file(update.sql):

UPDATE orders
SET placed = 'yes'
WHERE
custid = {{ custid }}
;

:(

enter image description here

1

1 Answers

2
votes

The parameters is used to pass "variables" to SqlAlchemy engine. In this case the rendering is not done in Airflow engine. If you want to use this you need to use SqlAlchemy syntax. Example:

sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},

But in your case you want to template xcom so there is no reason to use parameters at all. You want the rendering to be done by Airflow.

You can just set it directly in the sql since sql is a template field:

UPDATE orders
SET placed = 'yes'
WHERE custid = "{{ ti.xcom_pull(task_ids='fetch') }}";