2
votes

New to Airflow. I am trying to save result to a file in another bucket (not the airflow one). I could save to a file in '/home/airflow/gcs/data/test.json', then use gcs_hook.GoogleCloudStorageHook to copy to another bucket. Here is the code:

def write_file_func(**context):
    file = f'/home/airflow/gcs/data/test.json'
    with open(file, 'w') as f:
        f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func(**context):
    conn = gcs_hook.GoogleCloudStorageHook()
    source_bucket = 'source_bucket'
    source_object = 'data/test.json'
    target_bucket = 'target_bucket'
    target_object = 'test.json'
    conn.copy(source_bucket, source_object, target_bucket, target_object)
    conn.delete(source_bucket, source_object)

My questions are:

  1. Can we directly write to a file at the target bucket? I didn't find any method in the gcs_hook.

  2. I tried to use google.cloud.storage bucket.blob('test.json').upload_from_string(), but the airflow keeps saying "The DAG isn't available in the server's DAGBag", very annoying, are we not allowed to use that API in the DAG?

  3. If we can use google.cloud.storage/bigquery API directly, what's the difference between that and Airflow API, like the gcs_hook/bigquery_hook?

Thanks

1

1 Answers

2
votes
  1. No you cannot “directly write to a file at the target bucket.” To modify a file stored in GCS you need to download it locally, make the file changes, and then upload the modified file back to GCS. For more information, refer to [Google Cloud Storage][1] and [methods][2].

  2. I have compiled the below code in Apache Airflow with success. Feel free to use it.

import pip
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import json
from datetime import datetime
def write_file_func():
    file = f'/home/airflow/gcs/data/test.json'
    with open(file, 'w') as f:
        f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func():
    conn = GoogleCloudStorageHook()
    source_bucket = 'source_bucket'
    source_object = 'data/test.json'
    target_bucket = 'target_bucket'
    target_object = 'test.json'
    conn.copy(source_bucket, source_object, target_bucket, target_object)
    #conn.delete(source_bucket, source_object)
with DAG('load_gcs_file', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
    create_file	= PythonOperator(task_id='create_file', python_callable=write_file_func)
    copy_file	= PythonOperator(task_id='copy_file', python_callable=upload_file_func)
    
    create_file >> copy_file

Note: -) please change the source_bucket name value to reflect your source bucket name. -) please change the target_bucket name value to reflect your target bucket name.

  1. Airflow hooks are reusable interfaces to external libraries (such as google.cloud.storage) so that many different operators can talk to those external APIs in a consistent way.
    A generic example is when an external library is updated: instead of needing to update code every single place where the external library was used, only the hook code needs to be changed.