3
votes

I'm using the Self-Hosted Integration Runtime in Azure Data Factory to copy data from an On-Premises source (normal file system) to an Azure Blob Storage destination. After being transferred, I want to process the files automatically by attaching a Notebook running on a Databricks cluster. The pipeline works fine, but my question concerns the output of the Copy Activity.

Is there a way to get information on the transferred files and folders for each run? I would pass this information as parameters to the notebook.

Looking at the documentation, it seems only aggregated information is available:

https://docs.microsoft.com/en-us/azure/data-factory/copy-activity-overview

Which kind of makes sense, if you transfer huge amounts of files. If not possible, I guess an alternate approach would be to just leave the copy process to itself, and create another pipeline based on storage account events? Or maybe store the new file and folder information for each run in a fixed text file, transfer it also, and read it in the notebook?

2

2 Answers

0
votes

If you want to get information of files or directories beeing read from data factory this can be done using the Get Metadata Activity, see the following answer for an example.

Another approach to detect new files in your notebook would be to use structured streaming with file sources. This works pretty well and you just call the notebook activity after the copy activity.

For this you define a streaming input data frame:

streamingInputDF = (
   spark
     .readStream                     
     .schema(pqtSchema)               
     .parquet(inputPath) 
 )

with inputPath pointing to the input dir in the Blob Storage. Supported file formats are text, csv, json, orc, parquet, so it depends on your concrete scenario if this will work for you.

Important is that on the target you use the trigger once option, so the notebook does not need to run pemananently, e. g.:

streamingOutputDF \
    .repartition(1) \
    .writeStream \
    .format("parquet") \
    .partitionBy('Id') \
    .option("checkpointLocation", adlpath +  "spark/checkpointlocation/data/trusted/sensorreadingsdelta") \
    .option("path", targetPath + "delta") \
    .trigger(once=True) \
    .start()

Another approach could be using Azure Queue Storage (AQS), see the following documentation.

0
votes

The solution was actually quite simple in this case. I just created another pipeline in Azure Data Factory, which was triggered by a Blob Created event, and the folder and filename passed as parameters to my notebook. Seems to work well, and a minimal amount of configuration or code required. Basic filtering can be done with the event, and the rest is up to the notebook.

For anyone else stumbling across this scenario, details below:

https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger