I want to use GCSToSFTPOperator
in my GCP composer environment, we have ariflow version 1.10.3
, composer-1.8.3-airflow-1.10.3
(I have upgared version from 1.10.2 to 1.10.3) in GCP composer environment. GCSToSFTPOperator
is present in latest release of Airflow.
See below reference -
https://airflow.readthedocs.io/en/latest/_modules/airflow/operators/gcs_to_sftp.html
I tried with plugin also, I copied GCSToSFTPOperator
class source code in plugin folder then import in my python DAG, now also I am getting error for airflow.gcp
afterwards I tried to install gcp 0.2.1
pypi package in composer environment there also failed to Install error.
Step 1 - Creation of DAG code which is placed in the DAG folder
import os
from airflow import DAG
from airflow import models
from PluginGCSToSFTPOperator import GCSToSFTPOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
BUCKET_SRC = "bucket-name"
OBJECT_SRC = "parent-1.bin"
DESTINATION_PATH = "/tmp/single-file/"
with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None,
tags=['example']
) as dag:
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="file-copy-gsc-to-sftp",
source_bucket=BUCKET_SRC,
source_object=OBJECT_SRC,
destination_path=DESTINATION_PATH,
)
copy_file_from_gcs_to_sftp
Step 2 - copied GCSToSFTPOperator class code and pasted in one python file, same file placed into plugin folder.
import os
from tempfile import NamedTemporaryFile
from typing import Optional
#from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GCSHook
from airflow.models import BaseOperator
from airflow.providers.sftp.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class GCSToSFTPOperator(BaseOperator):
template_fields = ("source_bucket", "source_object", "destination_path")
ui_color = "#f0eee4"
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(
self,
source_bucket: str,
source_object: str,
destination_path: str,
move_object: bool = False,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "ssh_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_bucket = source_bucket
self.source_object = source_object
self.destination_path = destination_path
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.sftp_conn_id = sftp_conn_id
self.delegate_to = delegate_to
self.sftp_dirs = None
def execute(self, context):
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_object:
total_wildcards = self.source_object.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_object parameter. "
"Found {} in {}.".format(total_wildcards, self.source_object)
)
prefix, delimiter = self.source_object.split(WILDCARD, 1)
objects = gcs_hook.list(
self.source_bucket, prefix=prefix, delimiter=delimiter
)
for source_object in objects:
destination_path = os.path.join(self.destination_path, source_object)
self._copy_single_object(
gcs_hook, sftp_hook, source_object, destination_path
)
self.log.info(
"Done. Uploaded '%d' files to %s", len(objects), self.destination_path
)
else:
destination_path = os.path.join(self.destination_path, self.source_object)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_object, destination_path
)
self.log.info(
"Done. Uploaded '%s' file to %s", self.source_object, destination_path
)
def _copy_single_object(
self,
gcs_hook: GCSHook,
sftp_hook: SFTPHook,
source_object: str,
destination_path: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of gs://%s/%s to %s",
self.source_bucket,
source_object,
destination_path,
)
dir_path = os.path.dirname(destination_path)
sftp_hook.create_directory(dir_path)
with NamedTemporaryFile("w") as tmp:
gcs_hook.download(
bucket_name=self.source_bucket,
object_name=source_object,
filename=tmp.name,
)
sftp_hook.store_file(destination_path, tmp.name)
if self.move_object:
self.log.info(
"Executing delete of gs://%s/%s", self.source_bucket, source_object
)
gcs_hook.delete(self.source_bucket, source_object)
Step 3 - I tried with placing same file into DAG folder also, after that also getting same error "No module named 'airflow.gcp'"
Now What should I can try ?
Is there any alternative operator present or do we have any other way to use this GCSToSFTPOperator
in airflow 1.10.3 version ??