I am trying to read a file from a Blob Storage, load to pandas and write it to a BlobStorage
I have an Azure Machine Learning Pipeline with a PythonScriptStep that takes 2 PipelineParameters and are DataPaths as below.
from azureml.core import Datastore from azureml.data.datapath import DataPath, DataPathComputeBinding, DataReference from azureml.pipeline.core import PipelineParameter datastore = Datastore(ws, "SampleStore") in_raw_path_default = 'somefolder/raw/alerts/2020/08/03/default_in.csv' in_cleaned_path_default= 'somefolder/cleaned/alerts/2020/08/03/default_out.csv' in_raw_datapath = DataPath(datastore=datastore, path_on_datastore=in_raw_path_default) in_raw_path_pipelineparam = PipelineParameter(name="inrawpath", default_value=in_raw_datapath) raw_datapath_input = (in_raw_path_pipelineparam, DataPathComputeBinding(mode='mount')) in_cleaned_datapath = DataPath(datastore=datastore, path_on_datastore=in_cleaned_path_default) in_cleaned_path_pipelineparam = PipelineParameter(name="incleanedpath", default_value=in_cleaned_datapath) cleaned_datapath_input = (in_cleaned_path_pipelineparam, DataPathComputeBinding(mode='mount')) from azureml.pipeline.steps import PythonScriptStep source_directory = script_folder + '/pipeline_Steps' dataprep_step = PythonScriptStep( script_name="SimpleTest.py", arguments=["--input_data", raw_datapath_input, "--cleaned_data", cleaned_datapath_input], inputs=[raw_datapath_input, cleaned_datapath_input], compute_target=default_compute, source_directory=source_directory, runconfig=run_config, allow_reuse=True ) from azureml.pipeline.core import Pipeline pipeline_test = Pipeline(workspace=ws, steps=[dataprep_step]) test_raw_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/raw/alerts/2017/05/31/test.csv') test_cleaned_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/cleaned/alerts/2020/09/03') pipeline_run_msalerts = Experiment(ws, 'SampleExperiment').submit(pipeline_test, pipeline_parameters={"inrawpath" : test_raw_path, "incleanedpath" : test_cleaned_path})```
This is the Script Used(SimpleTest.py):
import os
import sys
import argparse
import pathlib
import azureml.core
import pandas as pd
parser = argparse.ArgumentParser("datapreponly")
parser.add_argument("--input_data", type=str)
parser.add_argument("--cleaned_data", type=str)
args = parser.parse_args()
print("Argument 1: %s" % args.input_data)
print("Argument 2: %s" % args.cleaned_data)
testDf = pd.read_csv(args.input_data, error_bad_lines=False)
print('Total Data Shape' + str(testDf.shape))
if not (args.cleaned_data is None):
output_path = args.cleaned_data
os.makedirs(output_path, exist_ok=True)
outdatapath = output_path + '/alert.csv'
testDf.to_csv(outdatapath, index=False)
Triggering this AzureMLPipeline from AzureDataFactory :
The above code works fine by executing the ML pipeline in AzureMLWorkspace/PipelineSDK. I am trying to trigger the AzureMLpipeline from AzureDataFactory(AzureMachineLearningExecutePipeline) activity as follows
Tried a debug run as follows by passing 2 string input paths
rawdatapath = "samplefolder/raw/alerts/2017/05/31/test.csv"
cleaneddatapath = "samplefolder/raw/cleaned/2020/09/03/"
Current directory: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/workspaceblobstore/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade
Preparing to call script [ SimpleTest.py ]
with arguments:
['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
'--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
After variable expansion, calling script [ SimpleTest.py ] with arguments:
['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
'--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
Script type = None
Argument 1: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv
Argument 2: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv
.......................
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv'
It shows that the default path is taken instead of the pipeline parameter(No such File or directory error is less important as the main point is the default path is taken instead of the pipeline parameters). I doubt its because of pass the pipelineparameter as a string instead of a datapath.
FINALLY THE QUESTION : How to pass a datapath to an AzureMLPipelineActivity from Azure Data Factory?
Thanks.