2
votes

I am building an Azure ML pipeline with the azureml Python SDK. The pipeline calls a PythonScriptStep which stores data on the workspaceblobstore of the AML workspace.

I would like to extend the pipeline to export the pipeline data to an Azure Data Lake (Gen 1). Connecting the output of the PythonScriptStep directly to Azure Data Lake (Gen 1) is not supported by Azure ML as far as I understand. Therefore, I added an extra DataTransferStep to the pipeline, which takes the output from the PythonScriptStep as input directly into the DataTransferStep. According to the Microsoft documentation this should be possible.

So far I have built this solution, only this results in a file of 0 bytes on the Gen 1 Data Lake. I think the output_export_blob PipelineData does not correctly references the test.csv, and therefore the DataTransferStep cannot find the input. How can I connect the DataTransferStep correctly with the PipelineData output from the PythonScriptStep?

Example I followed: https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.ipynb

pipeline.py

input_dataset = delimited_dataset(
    datastore=prdadls_datastore,
    folderpath=FOLDER_PATH_INPUT,
    filepath=INPUT_PATH
)

output_export_blob = PipelineData(
    'export_blob',
    datastore=workspaceblobstore_datastore,
)

test_step = PythonScriptStep(
    script_name="test_upload_stackoverflow.py",
    arguments=[
        "--output_extract", output_export_blob,
    ],
    inputs=[
        input_dataset.as_named_input('input'),
    ],
    outputs=[output_export_blob],
    compute_target=aml_compute,
    source_directory="."
)

output_export_adls = DataReference(
    datastore=prdadls_datastore, 
    path_on_datastore=os.path.join(FOLDER_PATH_OUTPUT, 'test.csv'),
    data_reference_name='export_adls'        
)

export_to_adls = DataTransferStep(
    name='export_output_to_adls',
    source_data_reference=output_export_blob,
    source_reference_type='file',
    destination_data_reference=output_export_adls,
    compute_target=adf_compute
)

pipeline = Pipeline(
    workspace=aml_workspace, 
    steps=[
        test_step, 
        export_to_adls
    ]
)

test_upload_stackoverflow.py

import os
import pathlib
from azureml.core import Datastore, Run

parser = argparse.ArgumentParser("train")
parser.add_argument("--output_extract", type=str)
args = parser.parse_args() 

run = Run.get_context()
df_data_all = (
    run
    .input_datasets["input"]
    .to_pandas_dataframe()
)

os.makedirs(args.output_extract, exist_ok=True)
df_data_all.to_csv(
    os.path.join(args.output_extract, "test.csv"), 
    index=False
)
1

1 Answers

0
votes

The code example is immensely helpful. Thanks for that. You're right that it can be confusing to get PythonScriptStep -> PipelineData. Working initially even without the DataTransferStep.

I don't know 100% what's going on, but I thought I'd spitball some ideas:

  1. Does your PipelineData, export_blob, contain the "test.csv" file? I would verify that before troubleshooting the DataTransferStep. You can verify this using the SDK, or more easily with the UI.
    1. Go to the PipelineRun page, click on the PythonScriptStep in question.
    2. On "Outputs + Logs" page, there's a "Data Outputs" Section (that is slow to load initially)
    3. Open it and you'll see the output PipelineDatas then click on "View Output"
    4. Navigate to given path either in the Azure Portal or Azure Storage Explorer. enter image description here enter image description here
  2. In test_upload_stackoverflow.py you are treating the PipelineData as a directory when call .to_csv() as opposed to a file which would be you just calling df_data_all.to_csv(args.output_extract, index=False). Perhaps try defining the PipelineData with is_directory=True. Not sure if this is required though.