0
votes

I am setting up a Data Flow in ADF that takes an Azure Table Dataset as Source, adds a Derived Column that adds a column with the name "filename" and a dynamic value, based on a data field from the source schema.

Then the output is sent to a sink that is linked to a DataSet that is attached to Blob Storage (tried ADLS Gen2 and standard Blob storage).

However, after executing the pipeline, instead of finding multiple files in my container, I see there are folders created with the name filename=ABC123.csv that on its own contains other files (it makes me think of parquet files):

- filename=ABC123.csv
  + _started_UNIQUEID
  + part-00000-tid-UNIQUEID-guids.c000.csv

So, I'm clearly missing something, as I would need to have single files listed in the dataset container with the name I have specified in the pipeline.

This is how the pipeline looks like: Screenshot

The Optimize tab of the Sink shape looks like this: enter image description here

Here you can see the settings of the Sink shape: enter image description here

And this is the code of the pipeline (however some parts are edited out):

source(output(
        PartitionKey as string,
        RowKey as string,
        Timestamp as string,
        DeviceId as string,
        SensorValue as double
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    inferDriftedColumnTypes: true) ~> devicetable
devicetable derive(filename = Isin + '.csv') ~> setoutputfilename
setoutputfilename sink(allowSchemaDrift: true,
    validateSchema: false,
    rowUrlColumn:'filename',
    mapColumn(
        RowKey,
        Timestamp,
        DeviceId,
        SensorValue
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> distributetofiles

Any suggestions or tips? (I'm rather new to ADF, so bear with me)

2
Can you share a screen capture of the Sink's Settings and Optimize tabs? There are many options there to manage the output files.Joel Cochran
@JoelCochran, thx for replying. I updated the above description with two screenshotsSam Vanhoutte
@Sam Vanhoutte looks like you did this before stackoverflow.com/questions/66775452/…wwnde

2 Answers

0
votes

I recently struggled through something similar to your scenario (but not exactly the same). There are a lot of options and moving parts here, so this post is not meant to be exhaustive. Hopefully something in it will steer you towards the solution you are after.

Step 1: Source Partitioning In Data Flow, you can group like rows together via Set Partitioning. One of the many options is by Key (a column in the source): enter image description here

In this example, we have 51 US States (50 states + DC), and so will end up with 51 partitions.

Step 2: Sink Settings As you found out, the "As data in column" option results in a structured folder name like {columnName}={columnValue}. I've been told this is because it is a standard in Hadoop/Spark type environments. Inside that folder will be a set of files, typically with non-human-friendly GUID based names.

"Default" will give much the same result you currently have, without the column based folder name. Output to Single File" is pretty self-explanatory, and the farthest thing from the solution you are after. If you want control over the final file names, the best option I have found is the "Pattern" option. This will generate file(s) with the specified name and a variable number [n]. I honestly don't know what Per partition would generate, but it may get close to you the results you are after, 1 file per column value.

enter image description here

Some caveats:

  • The folder name is defined in the Sink Dataset, NOT in the Data Flow. Dataset parameters is really probably "Step 0". For Blob type output, you could probably hard code the folder name like "myfolder/fileName-[n]". YMMV.
  • Unfortunately, none of these options will permit you to use a derived column to generate the file name. [If you open the expression editor, you'll find that "Incoming schema" is not populated.]

Step 3: Sink Optimize The last piece you may experiment with is Sink Partitioning under the Optimize tab: enter image description here

"Use current partitioning" will group the results based on the partition set in the Source configuration. "Single partition" will group all the results into a single output group (almost certainly NOT what you want). "Set partitioning" will allow you to re-group the Sink data based on a Key column. Unlike the Sink settings, this WILL permit you to access the derived column name, but my guess is that you will end up with the same folder naming problem you have now.

At the moment, this is all I know. I believe that there is a combination of these options that will produce what you want, or something close to it. You may need to approach this in multiple steps, such as have this flow output to incorrectly named folders to a staging location, then have another pipeline/flow that processes each folder and collapses the results the desired name.

0
votes

You're seeing the ghost files left behind by the Spark process in your dataset folder path. When you use 'As data in column', ADF will write the file using your field value starting at the container root.

You'll see this noted on the 'Column with file name' property:

enter image description here

So, if you navigate to your storage container root, you should see the ABC123.csv file.

Now, if you want to put that file in a folder, just prepend that folder name in your Derived Column transformation formula something like this:

"output/folder1/{Isin}.csv"

The double-quotes activate ADF's string interpolation. You can combine literal text with formulas that way.