0
votes

In Airflow, I have an XCom Task ID customer_schema that I want to transform into a JSON file named final_schema.json and upload into Google Cloud Storage. My bucket in Google Cloud Storage is named northern_industrial_customer. I tried to use the following FileToGoogleCloudStorageOperator, but it did not work.

Does anyone know how I transfer my XCom Task ID customer_schema to Google Cloud Storage as a JSON file named final_schema.json?

transfer_to_gcs = FileToGoogleCloudStorageOperator(task_id = 'transfer_to_gcs', src = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", dst = 'final_schema.json', bucket = 'northern_industrial_customer', google_cloud_storage_conn_id = conn_id_gcs)

1

1 Answers

2
votes

there is no operator in Airflow to perform these operation, but Airflow is extensible and you can write your own custom operator.

import tempfile
import warnings

from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class ContentToGoogleCloudStorageOperator(BaseOperator):
    """
    Uploads a text content to Google Cloud Storage.
    Optionally can compress the content for upload.

    :param content: Content to upload. (templated)
    :type src: str
    :param dst: Destination path within the specified bucket, it must be the full file path
        to destination object on GCS, including GCS object (ex. `path/to/file.txt`) (templated)
    :type dst: str
    :param bucket: The bucket to upload to. (templated)
    :type bucket: str
    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
    :type gcp_conn_id: str
    :param mime_type: The mime-type string
    :type mime_type: str
    :param delegate_to: The account to impersonate, if any
    :type delegate_to: str
    :param gzip: Allows for file to be compressed and uploaded as gzip
    :type gzip: bool
    """
    template_fields = ('src', 'dst', 'bucket')

    @apply_defaults
    def __init__(self,
                 content,
                 dst,
                 bucket,
                 gcp_conn_id='google_cloud_default',
                 mime_type='application/octet-stream',
                 delegate_to=None,
                 gzip=False,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)

        self.content = content
        self.dst = dst
        self.bucket = bucket
        self.gcp_conn_id = gcp_conn_id
        self.mime_type = mime_type
        self.delegate_to = delegate_to
        self.gzip = gzip

    def execute(self, context):
        """
        Uploads the file to Google cloud storage
        """
        hook = GoogleCloudStorageHook(
            google_cloud_storage_conn_id=self.gcp_conn_id,
            delegate_to=self.delegate_to
        )

        with tempfile.NamedTemporaryFile(prefix="gcs-local") as file:
            file.write(self.content)
            file.flush()
            hook.upload(
                bucket_name=self.bucket,
                object_name=self.dst,
                mime_type=self.mime_type,
                filename=file.name,
                gzip=self.gzip,
            )

transfer_to_gcs = ContentToGoogleCloudStorageOperator(
    task_id = 'transfer_to_gcs', 
    content = "{{task_instance.xcom_pull(task_ids='customer_schema')}}", 
    dst = 'final_schema.json', 
    bucket = 'northern_industrial_customer', 
    gcp_conn_id = conn_id_gcs)

Please note that in Airflow 2.0 the google_cloud_storage_conn_id parameter in the FileToGoogleCloudStorageOperator operator is discontinued. You should use gcp_conn_id