2
votes

I'm currently using Airflow with the BigQuery operator to trigger various SQL scripts. This works fine when the SQL is written directly in the Airflow DAG file. For example:

bigquery_transform = BigQueryOperator(
        task_id='bq-transform',
        bql='SELECT * FROM `example.table`',
        destination_dataset_table='example.destination'
    )

However, I'd like to store the SQL in a separate file saved to a storage bucket. For example:

bql='gs://example_bucket/sample_script.sql'

When calling this external file I recieve a "Template Not Found" error.

I've seen some examples load the SQL file into the Airflow DAG folder, however, I'd really like to access files saved to a separate storage bucket. Is this possible?

3

3 Answers

1
votes

You can also consider using the gcs_to_gcs operator to copy things from your desired bucket into one that is accessible by composer.

1
votes

You can reference any SQL files in your Google Cloud Storage Bucket. Here's a following example where I call the file Query_File.sql in the sql directory in my airflow dag bucket.

CONNECTION_ID = 'project_name'

with DAG('dag', schedule_interval='0 9 * * *', template_searchpath=['/home/airflow/gcs/dags/'], max_active_runs=15, catchup=True, default_args=default_args) as dag:

battery_data_quality = BigQueryOperator(
  task_id='task-id',
  sql='/SQL/Query_File.sql',
  destination_dataset_table='project-name.DataSetName.TableName${{ds_nodash}}',
  write_disposition='WRITE_TRUNCATE',
  bigquery_conn_id=CONNECTION_ID,
  use_legacy_sql=False,
  dag=dag
)
0
votes

download works differently in GoogleCloudStorageDownloadOperator for Airflow version 1.10.3 and 1.10.15.

 def execute(self, context):

        self.object = context['dag_run'].conf['job_name'] + '.sql'
        logging.info('filemname in GoogleCloudStorageDownloadOperator: %s', self.object)
        self.filename = context['dag_run'].conf['job_name'] + '.sql'

        self.log.info('Executing download: %s, %s, %s', self.bucket,
                      self.object, self.filename)
        hook = GoogleCloudStorageHook(
            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
            delegate_to=self.delegate_to
        )
        file_bytes = hook.download(bucket=self.bucket,
                                   object=self.object)
        if self.store_to_xcom_key:
            if sys.getsizeof(file_bytes) < 49344:
                context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes.decode('utf-8'))
            else:
                raise RuntimeError(
                    'The size of the downloaded file is too large to push to XCom!'

                )