0
votes

I would like to instantiate a task (through airflow) which copy file in a bucket on google cloud storage to a drive.

I use the dedicated operator which is located in :

from airflow.contrib.operators.gcs_to_gdrive_operator import GcsToGDriveOperator

then the operator :

copy_files = GcsToGDriveOperator(
        task_id="copy_files",
        source_bucket=GCS_BUCKET_ID,
        source_object='{}/{}/forecasted/*'.format(COUNTRY_TRIGRAM, PRED_START_RANGE),
        destination_object="content/drive/Shared Drives/FORECAST_TEST",
        gcp_conn_id="airflow_service_account_conn_w_drive"
    )

The task is succesfull but do not copy the file in the "destination object" which is the part that i'm not sure what to put in.

1

1 Answers

0
votes

Reviewing Airflow GcsToGDriveOperator source code , I assume Airflow leverages gcs_hook.download() method downloading the files from GCS and gdrive_hook.upload_file() uploading these objects to the target Gdrive location.

Given said above, gcs_hook.download() method records each action for the successful operation result:

self.log.info('File downloaded to %s', filename)

Similarly, gdrive_hook.upload_file() writes each file uploading iteration as a logging message:

self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)

Even though the task succeeded, I believe that you can capture the above mentioned events in Airflow logs within the particular task, looking for the actual source and destination location paths derived from GcsToGDriveOperator() definition.

You can consider even Airflow workers log inspection connecting to GKE cluster and launching kubectl command-line tool:

kubectl logs deployment/airflow-worker -n $(kubectl get ns| grep composer*| awk '{print $1}') -c airflow-worker | grep 'Executing copy'