1
votes

I have set up an airflow workflow that ingests some files from s3 to Google Cloud storage and then runs a workflow of sql queries to create new tables on Big Query. At the end of the workflow I need to push the output of the one final Big Query table to Google Cloud Storage and from there to S3.

I have cracked the the transfer of the Big Query table to Google Cloud Storage with no issues using the BigQueryToCloudStorageOperator python operator. However it seems the transfer from Google Cloud Storage to S3 is a less trodden route and I have been unable to find a solution which I can automate in my Airflow workflow.

I am aware of rsync which comes as part of the gsutil and have gotten this working (see post Exporting data from Google Cloud Storage to Amazon S3) but I am unable to add this into my workflow.

I have a dockerised airflow container running on a compute engine instance.

Would really appreciate help solving this problem.

Many thanks!

3

3 Answers

6
votes

So we are also using rsync to move data between S3 and GCS,

You first need to get a bash script working, something like gsutil -m rsync -d -r gs://bucket/key s3://bucket/key

For s3 you also need to provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as environment variable.

Then define your BashOperator and put it in your DAG file

rsync_yesterday = BashOperator(task_id='rsync_task_' + table,
                                bash_command='Your rsync script',
                                dag=dag)
0
votes

Google recommends using it's transfer service for transfers between cloud platforms. You can programatically setup a transfer using their python API. This way the data is transferred directly between S3 and google cloud storage. The disadvantage of using gsutil and rsync is that the data will have to go through the machine/instance which executes the rsync command. This can be a bottle neck.

Google Cloud Storage Transfer Service Doc

0
votes

I had a requirement to copy objects from GC storage bucket to S3 using AWS Lambda.

Python boto3 library allows listing and downloading objects from GC bucket.

Below is sample lambda code to copy "sample-data-s3.csv" object from GC bucket to s3 bucket.

import boto3
import io

s3 = boto3.resource('s3')

google_access_key_id="GOOG1EIxxMYKEYxxMQ"
google_access_key_secret="QifDxxMYSECRETKEYxxVU1oad1b"

gc_bucket_name="my_gc_bucket"


def get_gcs_objects(google_access_key_id, google_access_key_secret,
                     gc_bucket_name):
    """Gets GCS objects using boto3 SDK"""
    client = boto3.client("s3", region_name="auto",
                          endpoint_url="https://storage.googleapis.com",
                          aws_access_key_id=google_access_key_id,
                          aws_secret_access_key=google_access_key_secret)

    # Call GCS to list objects in gc_bucket_name
    response = client.list_objects(Bucket=gc_bucket_name)

    # Print object names
    print("Objects:")
    for blob in response["Contents"]:
        print(blob)    

    object = s3.Object('my_aws_s3_bucket', 'sample-data-s3.csv')
    f = io.BytesIO()
    client.download_fileobj(gc_bucket_name,"sample-data.csv",f)
    object.put(Body=f.getvalue())

def lambda_handler(event, context):
    get_gcs_objects(google_access_key_id,google_access_key_secret,gc_bucket_name) 

You can loop through blob to download all objects from GC bucket.

Hope this helps someone who wants to use AWS lambda to transfer objects from GC bucket to s3 bucket.