Here I want to use SFTPToGCSOperator in composer enviornment(1.10.6) of GCP. I know there is a limitation because The operator present only in latest version of airflow not in composer latest version 1.10.6.
See the refrence - https://airflow.readthedocs.io/en/latest/howto/operator/gcp/sftp_to_gcs.html
I found the alternative of operator and I created a plugin class, But again I faced the issue for sftphook class, Now I am using older version of sftphook class.
see the below refrence -
from airflow.contrib.hooks.sftp_hook import SFTPHook https://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html
I have created a plugin class, later It's import in my DAG script. It's working fine only when we are moveing one file, In that case we need to pass complete file path with extension.
Please refer below example(It's working fine in this scenrio)
DIR = "/test/sftp_dag_test/source_dir"
OBJECT_SRC_1 = "file.csv"
source_path=os.path.join(DIR, OBJECT_SRC_1),
Except this If we are using wildcard, I mean if we want to move all the files from directory I am getting error for get_tree_map method.
Please see below DAG code
import os
from airflow import models
from airflow.models import Variable
from PluginSFTPToGCSOperator import SFTPToGCSOperator
#from airflow.contrib.operators.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
DIR_path = "/main_dir/sub_dir/"
BUCKET_SRC = "test-gcp-bucket"
with models.DAG(
"dag_sftp_to_gcs", default_args=default_args, schedule_interval=None
) as dag:
copy_sftp_to_gcs = SFTPToGCSOperator(
task_id="t_sftp_to_gcs",
sftp_conn_id="test_sftp_conn",
gcp_conn_id="google_cloud_default",
source_path=os.path.join(DIR_path, "*.gz"),
destination_bucket=BUCKET_SRC,
)
copy_sftp_to_gcs
Here we are using wildcard * in DAG script, please see below plugin class.
import os
from tempfile import NamedTemporaryFile
from typing import Optional, Union
from airflow.plugins_manager import AirflowPlugin
from airflow import AirflowException
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.utils.decorators import apply_defaults
WILDCARD = "*"
class SFTPToGCSOperator(BaseOperator):
template_fields = ("source_path", "destination_path", "destination_bucket")
@apply_defaults
def __init__(
self,
source_path: str,
destination_bucket: str = "destination_bucket",
destination_path: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
sftp_conn_id: str = "sftp_conn_plugin",
delegate_to: Optional[str] = None,
mime_type: str = "application/octet-stream",
gzip: bool = False,
move_object: bool = False,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.source_path = source_path
self.destination_path = self._set_destination_path(destination_path)
print('destination_bucket : ',destination_bucket)
self.destination_bucket = destination_bucket
self.gcp_conn_id = gcp_conn_id
self.mime_type = mime_type
self.delegate_to = delegate_to
self.gzip = gzip
self.sftp_conn_id = sftp_conn_id
self.move_object = move_object
def execute(self, context):
print("inside execute")
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
sftp_hook = SFTPHook(self.sftp_conn_id)
if WILDCARD in self.source_path:
total_wildcards = self.source_path.count(WILDCARD)
if total_wildcards > 1:
raise AirflowException(
"Only one wildcard '*' is allowed in source_path parameter. "
"Found {} in {}.".format(total_wildcards, self.source_path)
)
print('self.source_path : ',self.source_path)
prefix, delimiter = self.source_path.split(WILDCARD, 1)
print('prefix : ',prefix)
base_path = os.path.dirname(prefix)
print('base_path : ',base_path)
files, _, _ = sftp_hook.get_tree_map(
base_path, prefix=prefix, delimiter=delimiter
)
for file in files:
destination_path = file.replace(base_path, self.destination_path, 1)
self._copy_single_object(gcs_hook, sftp_hook, file, destination_path)
else:
destination_object = (
self.destination_path
if self.destination_path
else self.source_path.rsplit("/", 1)[1]
)
self._copy_single_object(
gcs_hook, sftp_hook, self.source_path, destination_object
)
def _copy_single_object(
self,
gcs_hook: GoogleCloudStorageHook,
sftp_hook: SFTPHook,
source_path: str,
destination_object: str,
) -> None:
"""
Helper function to copy single object.
"""
self.log.info(
"Executing copy of %s to gs://%s/%s",
source_path,
self.destination_bucket,
destination_object,
)
with NamedTemporaryFile("w") as tmp:
sftp_hook.retrieve_file(source_path, tmp.name)
print('before upload self det object : ',self.destination_bucket)
gcs_hook.upload(
self.destination_bucket,
destination_object,
tmp.name,
self.mime_type,
)
if self.move_object:
self.log.info("Executing delete of %s", source_path)
sftp_hook.delete_file(source_path)
@staticmethod
def _set_destination_path(path: Union[str, None]) -> str:
if path is not None:
return path.lstrip("/") if path.startswith("/") else path
return ""
@staticmethod
def _set_bucket_name(name: str) -> str:
bucket = name if not name.startswith("gs://") else name[5:]
return bucket.strip("/")
class SFTPToGCSOperatorPlugin(AirflowPlugin):
name = "SFTPToGCSOperatorPlugin"
operators = [SFTPToGCSOperator]
So this plugin class I am importing in my DAG script and it's wotking fine when we are using file name, Because code is going inside else condition.
But when we are using wildcard we have cursor inside if condition and I am getting error for get_tree_map method.
see below error -
ERROR - 'SFTPHook' object has no attribute 'get_tree_map'
I found the reason of this error this method itself is not present in composer(airflow 1.10.6)- https://airflow.apache.org/docs/stable/_modules/airflow/contrib/hooks/sftp_hook.html
This method is present in latest version of airflow https://airflow.readthedocs.io/en/latest/_modules/airflow/providers/sftp/hooks/sftp.html
Now What should I can try, Is there any alternative of this method or any alternative of this operator class.
Does anyone know if there is a solution for this?
Thanks in Advance.
Please ignore Typo or indentation error in stackoverflow. In my code there is no Indentation error.