3
votes

  • I am trying to read a file from a Blob Storage, load to pandas and write it to a BlobStorage

  • I have an Azure Machine Learning Pipeline with a PythonScriptStep that takes 2 PipelineParameters and are DataPaths as below.

    from azureml.core import Datastore
    from azureml.data.datapath import DataPath, DataPathComputeBinding, DataReference
    from azureml.pipeline.core import PipelineParameter
    
    datastore = Datastore(ws, "SampleStore")
    in_raw_path_default = 'somefolder/raw/alerts/2020/08/03/default_in.csv'
    in_cleaned_path_default= 'somefolder/cleaned/alerts/2020/08/03/default_out.csv'
    
    in_raw_datapath = DataPath(datastore=datastore, path_on_datastore=in_raw_path_default)
    in_raw_path_pipelineparam = PipelineParameter(name="inrawpath", default_value=in_raw_datapath)
    raw_datapath_input = (in_raw_path_pipelineparam, DataPathComputeBinding(mode='mount'))
    
    in_cleaned_datapath = DataPath(datastore=datastore, path_on_datastore=in_cleaned_path_default)
    in_cleaned_path_pipelineparam = PipelineParameter(name="incleanedpath", default_value=in_cleaned_datapath)
    cleaned_datapath_input = (in_cleaned_path_pipelineparam, DataPathComputeBinding(mode='mount'))
    
    from azureml.pipeline.steps import PythonScriptStep
    
    source_directory = script_folder + '/pipeline_Steps'
    dataprep_step = PythonScriptStep(
        script_name="SimpleTest.py", 
        arguments=["--input_data", raw_datapath_input, "--cleaned_data", cleaned_datapath_input],
        inputs=[raw_datapath_input, cleaned_datapath_input],    
        compute_target=default_compute, 
        source_directory=source_directory,
        runconfig=run_config,
        allow_reuse=True
    )
    
    from azureml.pipeline.core import Pipeline
    pipeline_test = Pipeline(workspace=ws, steps=[dataprep_step])
    
    test_raw_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/raw/alerts/2017/05/31/test.csv')
    test_cleaned_path = DataPath(datastore=datastore, path_on_datastore='samplefolder/cleaned/alerts/2020/09/03')
    pipeline_run_msalerts = Experiment(ws, 'SampleExperiment').submit(pipeline_test, pipeline_parameters={"inrawpath"  : test_raw_path,
                                                                                                            "incleanedpath" : test_cleaned_path})```
    
    

This is the Script Used(SimpleTest.py):

import os
import sys
import argparse
import pathlib
import azureml.core
import pandas as pd

parser = argparse.ArgumentParser("datapreponly")
parser.add_argument("--input_data", type=str)
parser.add_argument("--cleaned_data", type=str)

args = parser.parse_args()

print("Argument 1: %s" % args.input_data)
print("Argument 2: %s" % args.cleaned_data)

testDf = pd.read_csv(args.input_data, error_bad_lines=False)
print('Total Data Shape' + str(testDf.shape))

if not (args.cleaned_data is None):
    output_path = args.cleaned_data
    os.makedirs(output_path, exist_ok=True)
    outdatapath = output_path + '/alert.csv'    
    testDf.to_csv(outdatapath, index=False)

Triggering this AzureMLPipeline from AzureDataFactory :
The above code works fine by executing the ML pipeline in AzureMLWorkspace/PipelineSDK. I am trying to trigger the AzureMLpipeline from AzureDataFactory(AzureMachineLearningExecutePipeline) activity as follows

enter image description here

Tried a debug run as follows by passing 2 string input paths
rawdatapath = "samplefolder/raw/alerts/2017/05/31/test.csv"
cleaneddatapath = "samplefolder/raw/cleaned/2020/09/03/"

enter image description here

Current directory:  /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/workspaceblobstore/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade
Preparing to call script [ SimpleTest.py ] 
with arguments:
 ['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
 '--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']
After variable expansion, calling script [ SimpleTest.py ] with arguments:
 ['--input_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv',
 '--cleaned_data', '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv']

Script type = None
Argument 1: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv
Argument 2: /mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/cleaned/alerts/2020/08/03/default_out.csv
.......................
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/batch/tasks/shared/LS_root/jobs/myazuremlworkspace/azureml/d8ee11ea-5838-46e5-a8ce-da2fbff5aade/mounts/SampleStore/somefolder/raw/alerts/2020/08/03/default_in.csv'

It shows that the default path is taken instead of the pipeline parameter(No such File or directory error is less important as the main point is the default path is taken instead of the pipeline parameters). I doubt its because of pass the pipelineparameter as a string instead of a datapath.


FINALLY THE QUESTION : How to pass a datapath to an AzureMLPipelineActivity from Azure Data Factory?


Thanks.

3

3 Answers

2
votes

This notebook demonstrates the use of DataPath and PipelineParameters in AML Pipeline. You will learn how strings and DataPath can be parameterized and submitted to AML Pipelines via PipelineParameters. You can parameterize input dataset and here is the sample notebook that shows how to do it.

Currently, ParallelRunStep accepts dataset as the data input. you can add one more step before ParallelRunStep to create a dataset object pointing to the new data and pass to ParallelRunStep. Here’s an example of using multiple steps:

For output, if you use append_row output action, you can customize the output file name through append_row_file_name config. The output will be stored in the default blob. To move it to other store, we suggest to use another DataTransferStep after ParallelRunStep. Please follow this example for data transfer step:

2
votes

Got an answer from Microsoft(please refer to this thread here). Azure Data Factory Product team confirms that there is no datatype supported for "DataPath" parameter today in Azure Data Factory(ADF). However, there is a feature already raised for the same and work is in progress for it. This feature will be a part of the November release.

1
votes

The input parameters seem to be defined as string, please try modifying them as Object datatype. As per documentation, it expects object {"Key" : "value"} parameters.

enter image description here