In our Azure Data Lake, we have daily files recording events and coordinates for those events. We need to take these coordinates and lookup what State, County, Township, and Section these coordinates fall into. I've attempted several versions of this code.
- I attempted to do this in U-SQL. I even uploaded a custom assembly that implemented Microsoft.SqlServer.Types.SqlGeography methods, only to find ADLA isn't set up to perform row-by-row operations like geocoding.
- I pulled all the rows into SQL Server, converted the coordinates into a SQLGeography and built T-SQL code that would perform the State, County, etc. lookups. After much optimization, I got this process down to ~700ms / row. (with 133M rows in the backlog and ~16k rows added daily we're looking at nearly 3 years to catch up. So I parallelized the T-SQL, things got better, but not enough.
- I took the T-SQL code, and built the process as a console application, since the SqlGeography library is actually a .Net library, not a native SQL Server product. I was able to get single threaded processing down t0 ~ 500ms. Adding in .Net's parallelism (parallel.ForEach) and throwing 10/20 of the cores of my machine at it does a lot, but still isn't enough.
- I attempted to rewrite this code as an Azure Function and processing files in the data lake file-by-file. Most of the files timed out, since they took longer than 10 minutes to process. So I've updated the code to read in the files, and shread the rows into Azure Queue storage. Then I have a second Azure function that fires for each row in the queue. The idea is, Azure Functions can scale out far greater than any single machine can.
And this is where I'm stuck. I can't reliably write rows to files in ADLS. Here is the code as I have it now.
public static void WriteGeocodedOutput(string Contents, String outputFileName, ILogger log) {
AdlsClient client = AdlsClient.CreateClient(ADlSAccountName, adlCreds);
//if the file doesn't exist write the header first
try {
if (!client.CheckExists(outputFileName)) {
using (var stream = client.CreateFile(outputFileName, IfExists.Fail)) {
byte[] headerByteArray = Encoding.UTF8.GetBytes("EventDate, Longitude, Latitude, RadarSiteID, CellID, RangeNauticalMiles, Azimuth, SevereProbability, Probability, MaxSizeinInchesInUS, StateCode, CountyCode, TownshipCode, RangeCode\r\n");
//stream.Write(headerByteArray, 0, headerByteArray.Length);
client.ConcurrentAppend(outputFileName, true, headerByteArray, 0, headerByteArray.Length);
}
}
} catch (Exception e) {
log.LogInformation("multiple attempts to create the file. Ignoring this error, since the file was created.");
}
//the write the data
byte[] textByteArray = Encoding.UTF8.GetBytes(Contents);
for (int attempt = 0; attempt < 5; attempt++) {
try {
log.LogInformation("prior to write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
var offset = client.GetDirectoryEntry(outputFileName).Length;
client.ConcurrentAppend(outputFileName, false, textByteArray, 0, textByteArray.Length);
log.LogInformation("AFTER write, the outputfile size is: " + client.GetDirectoryEntry(outputFileName).Length);
//if successful, stop trying to write this row
attempt = 6;
}
catch (Exception e){
log.LogInformation($"exception on adls write: {e}");
}
Random rnd = new Random();
Thread.Sleep(rnd.Next(attempt * 60));
}
}
The file will be created when it needs to be, but I do get several messages in my log that several threads tried to create it. I'm not always getting the header row written.
I also no longer get any data rows only:
"BadRequest ( IllegalArgumentException concurrentappend failed with error 0xffffffff83090a6f
(Bad request. The target file does not support this particular type of append operation.
If the concurrent append operation has been used with this file in the past, you need to append to this file using the concurrent append operation.
If the append operation with offset has been used in the past, you need to append to this file using the append operation with offset.
On the same file, it is not possible to use both of these operations.). []
I feel like I'm missing some fundamental design idea here. The code should try to write a row into a file. If the file doesn't yet exist, create it and put the header row in. Then, put in the row.
What's the best-practice way to accomplish this kind of write scenario?
Any other suggestions of how to handle this kind of parallel-write workload in ADLS?