Trying to write a Azure Data Factory Data Flow that would handle two similar versioned CSV files. Version 1 file has 48 columns. Version 2 file has 50 columns – same 48 columns as version 1 but has 2 additional columns appended to the end. I’d like to create a destination parquet file that contains all 50 columns to load into my SQLDW via polybase. Historically we have over 6 thousand files in the same blob source with no easy way to identify files with 48 columns vs 50 columns. Below is the closest I’ve come to a solution.
- Source CSV with Allow schema drift enabled. No schema defined on the CSV data set
- MapDrifted derived columns – i.e toString(byName('Manufacturer')) all 50 columns
- Sink – data set is parquet with schema defined by parquet template file which contains all 50 columns. Sink partition is set by sourcefilename. Each file coming in will have a parquet file generated in the output.
This solution works with a set of two test files. One with 48 columns and one with 50 columns. Two parquet files are created with 50 columns. One file is populated up to the 48th column the other file is populated with all 50 columns. If I add more source files with 48 columns to the test. The file with 50 columns looses the last two columns of data and only ends up with 48 columns in it? I thought this would be a common issue that ADF could solve. i.e file version changes over time. Any suggestions? Below is a a script of my ADF
source(allowSchemaDrift: true,
validateSchema: false,
rowUrlColumn: 'sourcefilename',
inferDriftedColumnTypes: true,
multiLineRow: true,
wildcardPaths:['avail/archive_csv2/*.csv']) ~> SRCAvailCSV
SRCAvailCSV derive(Manufacturer = toString(byName('Manufacturer')),
SKU = toString(byName('SKU')),
{Partner Name} = toString(byName('Partner Name')),
{Partner Part Number} = toString(byName('Partner Part Number')),
{Search Date} = toString(byName('Search Date')),
{Search Result Description} = toString(byName('Search Result Description')),
{1st Line Description} = toString(byName('1st Line Description')),
{2nd Line Description} = toString(byName('2nd Line Description')),
{Product Category} = toString(byName('Product Category')),
{Product Category 1} = toString(byName('Product Category 1')),
{Product Category 2} = toString(byName('Product Category 2')),
{Product Category 3} = toString(byName('Product Category 3')),
{Product Category 4} = toString(byName('Product Category 4')),
{UNSPSC Code} = toString(byName('UNSPSC Code')),
Pricing = toString(byName('Pricing')),
Currency = toString(byName('Currency')),
{Availability Qty} = toString(byName('Availability Qty')),
{Availability Status} = toString(byName('Availability Status')),
{Average Rating} = toString(byName('Average Rating')),
{Total Reviews} = toString(byName('Total Reviews')),
Brand = toString(byName('Brand')),
Model = toString(byName('Model')),
{Product Line} = toString(byName('Product Line')),
{Partner Site} = toString(byName('Partner Site')),
{Product URL} = toString(byName('Product URL')),
Warranty = toString(byName('Warranty')),
{Product Length} = toString(byName('Product Length')),
{Product Width} = toString(byName('Product Width')),
{Product Height} = toString(byName('Product Height')),
{Product Depth} = toString(byName('Product Depth')),
{Product Weight} = toString(byName('Product Weight')),
{Fullfilling Partner} = toString(byName('Fullfilling Partner')),
{Date First Available} = toString(byName('Date First Available')),
{Frequently Bought Together 1} = toString(byName('Frequently Bought Together 1')),
{Frequently Bought Together 1 Part Number} = toString(byName('Frequently Bought Together 1 Part Number')),
{Frequently Bought Together 2} = toString(byName('Frequently Bought Together 2')),
{Frequently Bought Together 2 Part Number} = toString(byName('Frequently Bought Together 2 Part Number')),
{Frequently Bought Together 3} = toString(byName('Frequently Bought Together 3')),
{Frequently Bought Together 3 Part Number} = toString(byName('Frequently Bought Together 3 Part Number')),
{Frequently Bought Together 4} = toString(byName('Frequently Bought Together 4')),
{Frequently Bought Together 4 Part Number} = toString(byName('Frequently Bought Together 4 Part Number')),
{From the Manufacturer} = toString(byName('From the Manufacturer')),
{Bestesellers Rank 1} = toString(byName('Bestesellers Rank 1')),
{Bestsellers Rank 2} = toString(byName('Bestsellers Rank 2')),
{Bestsellers Rank 3} = toString(byName('Bestsellers Rank 3')),
{Bestsellers Rank 4} = toString(byName('Bestsellers Rank 4')),
Endpoint = toString(byName('Endpoint')),
{Related StarTech.com SKU} = toString(byName('Related StarTech.com SKU')),
{Search SKU} = toString(byName('Search SKU')),
{Search Manufacturer} = toString(byName('Search Manufacturer')),
sourcefilename = sourcefilename) ~> MapDrifted1
MapDrifted1 sink(input(
FileName as string,
Manufacturer as string,
SKU as string,
PartnerName as string,
PartnerPartNumber as string,
SearchDate as string,
SearchResultDescription as string,
{1stLineDescription} as string,
{2ndLineDescription} as string,
ProductCategory as string,
ProductCategory1 as string,
ProductCategory2 as string,
ProductCategory3 as string,
ProductCategory4 as string,
UNSPSCCode as string,
Pricing as string,
Currency as string,
AvailabilityQty as string,
AvailabilityStatus as string,
AverageRating as string,
TotalReviews as string,
Brand as string,
Model as string,
ProductLine as string,
PartnerSite as string,
ProductURL as string,
Warranty as string,
ProductLength as string,
ProductWidth as string,
ProductHeight as string,
ProductDepth as string,
ProductWeight as string,
FullfillingPartner as string,
DateFirstAvailable as string,
FrequentlyBoughtTogether1 as string,
FrequentlyBoughtTogether1PartNumber as string,
FrequentlyBoughtTogether2 as string,
FrequentlyBoughtTogether2PartNumber as string,
FrequentlyBoughtTogether3 as string,
FrequentlyBoughtTogether3PartNumber as string,
FrequentlyBoughtTogether4 as string,
FrequentlyBoughtTogether4PartNumber as string,
FromtheManufacturer as string,
BestesellersRank1 as string,
BestsellersRank2 as string,
BestsellersRank3 as string,
BestsellersRank4 as string,
Endpoint as string,
RelatedStarTechcomSKU as string,
SearchSKU as string,
SearchManufacturer as string
),
allowSchemaDrift: false,
validateSchema: false,
format: 'parquet',
rowUrlColumn:'sourcefilename',
mapColumn(
FileName = sourcefilename,
Manufacturer,
SKU,
PartnerName = {Partner Name},
PartnerPartNumber = {Partner Part Number},
SearchDate = {Search Date},
SearchResultDescription = {Search Result Description},
{1stLineDescription} = {1st Line Description},
{2ndLineDescription} = {2nd Line Description},
ProductCategory = {Product Category},
ProductCategory1 = {Product Category 1},
ProductCategory2 = {Product Category 2},
ProductCategory3 = {Product Category 3},
ProductCategory4 = {Product Category 4},
UNSPSCCode = {UNSPSC Code},
Pricing,
Currency,
AvailabilityQty = {Availability Qty},
AvailabilityStatus = {Availability Status},
AverageRating = {Average Rating},
TotalReviews = {Total Reviews},
Brand,
Model,
ProductLine = {Product Line},
PartnerSite = {Partner Site},
ProductURL = {Product URL},
Warranty,
ProductLength = {Product Length},
ProductWidth = {Product Width},
ProductHeight = {Product Height},
ProductDepth = {Product Depth},
ProductWeight = {Product Weight},
FullfillingPartner = {Fullfilling Partner},
DateFirstAvailable = {Date First Available},
FrequentlyBoughtTogether1 = {Frequently Bought Together 1},
FrequentlyBoughtTogether1PartNumber = {Frequently Bought Together 1 Part Number},
FrequentlyBoughtTogether2 = {Frequently Bought Together 2},
FrequentlyBoughtTogether2PartNumber = {Frequently Bought Together 2 Part Number},
FrequentlyBoughtTogether3 = {Frequently Bought Together 3},
FrequentlyBoughtTogether3PartNumber = {Frequently Bought Together 3 Part Number},
FrequentlyBoughtTogether4 = {Frequently Bought Together 4},
FrequentlyBoughtTogether4PartNumber = {Frequently Bought Together 4 Part Number},
FromtheManufacturer = {From the Manufacturer},
BestesellersRank1 = {Bestesellers Rank 1},
BestsellersRank2 = {Bestsellers Rank 2},
BestsellersRank3 = {Bestsellers Rank 3},
BestsellersRank4 = {Bestsellers Rank 4},
Endpoint,
RelatedStarTechcomSKU = {Related StarTech.com SKU},
SearchSKU = {Search SKU},
SearchManufacturer = {Search Manufacturer}
)) ~> sink1