I am building an Azure ML pipeline with the azureml Python SDK. The pipeline calls a PythonScriptStep which stores data on the workspaceblobstore of the AML workspace.
I would like to extend the pipeline to export the pipeline data to an Azure Data Lake (Gen 1). Connecting the output of the PythonScriptStep directly to Azure Data Lake (Gen 1) is not supported by Azure ML as far as I understand. Therefore, I added an extra DataTransferStep to the pipeline, which takes the output from the PythonScriptStep as input directly into the DataTransferStep. According to the Microsoft documentation this should be possible.
So far I have built this solution, only this results in a file of 0 bytes on the Gen 1 Data Lake. I think the output_export_blob PipelineData does not correctly references the test.csv, and therefore the DataTransferStep cannot find the input. How can I connect the DataTransferStep correctly with the PipelineData output from the PythonScriptStep?
Example I followed: https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.ipynb
pipeline.py
input_dataset = delimited_dataset(
datastore=prdadls_datastore,
folderpath=FOLDER_PATH_INPUT,
filepath=INPUT_PATH
)
output_export_blob = PipelineData(
'export_blob',
datastore=workspaceblobstore_datastore,
)
test_step = PythonScriptStep(
script_name="test_upload_stackoverflow.py",
arguments=[
"--output_extract", output_export_blob,
],
inputs=[
input_dataset.as_named_input('input'),
],
outputs=[output_export_blob],
compute_target=aml_compute,
source_directory="."
)
output_export_adls = DataReference(
datastore=prdadls_datastore,
path_on_datastore=os.path.join(FOLDER_PATH_OUTPUT, 'test.csv'),
data_reference_name='export_adls'
)
export_to_adls = DataTransferStep(
name='export_output_to_adls',
source_data_reference=output_export_blob,
source_reference_type='file',
destination_data_reference=output_export_adls,
compute_target=adf_compute
)
pipeline = Pipeline(
workspace=aml_workspace,
steps=[
test_step,
export_to_adls
]
)
test_upload_stackoverflow.py
import os
import pathlib
from azureml.core import Datastore, Run
parser = argparse.ArgumentParser("train")
parser.add_argument("--output_extract", type=str)
args = parser.parse_args()
run = Run.get_context()
df_data_all = (
run
.input_datasets["input"]
.to_pandas_dataframe()
)
os.makedirs(args.output_extract, exist_ok=True)
df_data_all.to_csv(
os.path.join(args.output_extract, "test.csv"),
index=False
)