1
votes

Currently there is no S3ToBigQuery operator.

My choices are:

  1. Use the S3ToGoogleCloudStorageOperator and then use the GoogleCloudStorageToBigQueryOperator

    This is not something i'm eager to do. This means paying double for storage. Even if removing the file from either one of the storage that still involves payment.

  2. Download the file from S3 to local file system and load it to BigQuery from file system - However there is no S3DownloadOperator This means writing the whole process from scratch without Airflow involvement. This misses the point of using Airflow.

Is there another option? What would you suggest to do?

3

3 Answers

1
votes

This is what I ended up with. This should be converted to a S3toLocalFile Operator.

def download_from_s3(**kwargs):
    hook = S3Hook(aws_conn_id='project-s3')    

    result = hook.read_key(bucket_name='stage-project-metrics',
                           key='{}.csv'.format(kwargs['ds']))

    if not result:
        logging.info('no data found')
    else:
        outfile = '{}project{}.csv'.format(Variable.get("data_directory"),kwargs['ds'])

        f=open(outfile,'w+')
        f.write(result)
        f.close()

    return result
0
votes

If the first option is cost restrictive, you could just use the S3Hook to download the file through the PythonOperator:

from airflow.hooks.S3_hook import S3Hook
from datetime import timedelta, datetime
from airflow import DAG
from airflow.hooks.S3_hook import S3Hook
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}
def download_from_s3(**kwargs):


    hook = S3Hook(aws_conn_id='s3_conn')

    hook.read_key(bucket_name='workflows-dev',
                   key='test_data.csv')

dag = DAG('s3_download',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False)

with dag:
download_data = PythonOperator(
        task_id='download_data',
        python_callable=download_from_s3,
        provide_context=True
    )
0
votes

What you can do instead is use S3ToGoogleCloudStorageOperator and then use GoogleCloudStorageToBigQueryOperator with an external_table table flag i.e pass external_table =True.

This will create an external data that points to GCS location and doesn't store your data in BigQuery but you can still query it.