4
votes

I want to use GCSToSFTPOperator in my GCP composer environment, we have ariflow version 1.10.3, composer-1.8.3-airflow-1.10.3(I have upgared version from 1.10.2 to 1.10.3) in GCP composer environment. GCSToSFTPOperator is present in latest release of Airflow. See below reference - https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html

I tried with plugin also, I copied GCSToSFTPOperator class source code in plugin folder then import in my python DAG, now also I am getting error for airflow.gcp afterwards I tried to install gcp 0.2.1 pypi package in composer environment there also failed to Install error.

Step 1 - Creation of DAG code which is placed in the DAG folder

import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}

BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"


with models.DAG(
    "example_gcs_to_sftp", default_args=default_args, schedule_interval=None, 
    tags=['example']
) as dag:

    copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp",
        source_bucket=BUCKET_SRC,
        source_object=OBJECT_SRC,
        destination_path=DESTINATION_PATH,
    )

    copy_file_from_gcs_to_sftp

Step 2 - copied GCSToSFTPOperator class code and pasted in one python file, same file placed into plugin folder.

import os
from tempfile import NamedTemporaryFile
from typing import Optional

#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults

WILDCARD = "*"

class GCSToSFTPOperator(BaseOperator):

template_fields = ("source_bucket", "source_object", "destination_path")

ui_color = "#f0eee4"


# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
    self,
    source_bucket: str,
    source_object: str,
    destination_path: str,
    move_object: bool = False,
    gcp_conn_id: str = "google_cloud_default",
    sftp_conn_id: str = "ssh_default",
    delegate_to: Optional[str] = None,
    *args,
    **kwargs
) -> None:
    super().__init__(*args, **kwargs)

    self.source_bucket = source_bucket
    self.source_object = source_object
    self.destination_path = destination_path
    self.move_object = move_object
    self.gcp_conn_id = gcp_conn_id
    self.sftp_conn_id = sftp_conn_id
    self.delegate_to = delegate_to
    self.sftp_dirs = None

def execute(self, context):
    gcs_hook = GCSHook(
        gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
    )

    sftp_hook = SFTPHook(self.sftp_conn_id)

    if WILDCARD in self.source_object:
        total_wildcards = self.source_object.count(WILDCARD)
        if total_wildcards > 1:
            raise AirflowException(
                "Only one wildcard '*' is allowed in source_object parameter. "
                "Found {} in {}.".format(total_wildcards, self.source_object)
            )

        prefix, delimiter = self.source_object.split(WILDCARD, 1)
        objects = gcs_hook.list(
            self.source_bucket, prefix=prefix, delimiter=delimiter
        )

        for source_object in objects:
            destination_path = os.path.join(self.destination_path, source_object)
            self._copy_single_object(
                gcs_hook, sftp_hook, source_object, destination_path
            )

        self.log.info(
            "Done. Uploaded '%d' files to %s", len(objects), self.destination_path
        )
    else:
        destination_path = os.path.join(self.destination_path, self.source_object)
        self._copy_single_object(
            gcs_hook, sftp_hook, self.source_object, destination_path
        )
        self.log.info(
            "Done. Uploaded '%s' file to %s", self.source_object, destination_path

        )

def _copy_single_object(
    self,
    gcs_hook: GCSHook,
    sftp_hook: SFTPHook,
    source_object: str,
    destination_path: str,
) -> None:
    """
    Helper function to copy single object.
    """
    self.log.info(
        "Executing copy of gs://%s/%s to %s",
        self.source_bucket,
        source_object,
        destination_path,
    )

    dir_path = os.path.dirname(destination_path)
    sftp_hook.create_directory(dir_path)

    with NamedTemporaryFile("w") as tmp:
        gcs_hook.download(
            bucket_name=self.source_bucket,
            object_name=source_object,
            filename=tmp.name,
        )
        sftp_hook.store_file(destination_path, tmp.name)

    if self.move_object:
        self.log.info(
            "Executing delete of gs://%s/%s", self.source_bucket, source_object
        )
        gcs_hook.delete(self.source_bucket, source_object)

Step 3 - I tried with placing same file into DAG folder also, after that also getting same error "No module named 'airflow.gcp'"

Now What should I can try ? Is there any alternative operator present or do we have any other way to use this GCSToSFTPOperator in airflow 1.10.3 version ??

1
See this other answer: stackoverflow.com/a/65849239/2333780 Enjoy!Pascal GILLET

1 Answers

0
votes

The documentation that you were looking for is Airflow 1.10.7 version, the latest one. When you refer to Airflow 1.10.2 documentation, you will see that gcs_to_sftp operator is not present in this version.

What you can try, is to copy the code, make a plugin and put the code into plugin directory in the Composer instance bucket. If you still have problem with it, please provide all steps that you have taken already and I will try to help you.

You can also read more about upgrading the Airflow version in Composer.