I'm attempting to use the BigQueryOperator in Airflow by using a variable to populate the sql= attribute. The problem I'm running into is that the file extension is dropped when using Jinja variables. I've setup my code as follows:
dag = DAG(
dag_id='data_ingest_dag',
template_searchpath=['/home/airflow/gcs/dags/sql/'],
default_args=DEFAULT_DAG_ARGS
)
bigquery_transform = BigQueryOperator(
task_id='bq-transform',
write_disposition='WRITE_TRUNCATE',
sql="{{dag_run.conf['sql_script']}}",
destination_dataset_table='{{dag_run.conf["destination_dataset_table"]}}',
dag=dag
)
The passed variable contains the name of the SQL file stored in the separate SQL directory. If I pass the value as a static string, sql="example_file.sql", everything works fine. However, when I pass the example_file.sql using Jinja template variable it automatically drops the file extension and I receive this error:
BigQuery job failed.
Final error was: {u'reason': u'invalidQuery', u'message': u'Syntax error: Unexpected identifier "example_file" at [1:1]', u'location': u'query'}
Additionally, I've tried hardcoding ".sql" to the end of the variable anticipating that the extension would be dropped. However, this causes the entire variable reference to be interpreted as as string.
How do you use variables to populate BigQueryOperator attributes?