2
votes

We're using U-SQL to extract sensor data from a set of .csv files. Each record contains a sensor ID, time of measurement and value, as well as a time for when the record was received:

+----------+---------------------+------------------+---------------------+
| SensorID |   MeasurementTime   | MeasurementValue |    ReceivedTime     |
+----------+---------------------+------------------+---------------------+
| xxx      | 2017-09-10 11:00:00 |           12.342 | 2017-09-19 14:25:17 |
| xxx      | 2017-09-10 12:00:00 |           14.654 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 11:00:00 |            1.054 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 12:00:00 |            1.354 | 2017-09-19 14:25:17 |
  ...
| xxx      | 2017-09-10 11:00:00 |           10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+

The files are stored in ADLS in a path based on the date-portion of the measurement time, so the data seen above would be found in /Data/2017/09/10/measurements.csv, where the first four rows were written at 14:25:17 on the 19th of September, and the last row was appended one hour later, at 15:25:17.

As the above example illustrates, new values for the same SensorID and MeasurementTime can be received at a later time. Each partition holds a few million rows, with a few thousand rows being appended to a small number of partitions every day. We want to run a batch job say every 24 hours, that will output only the newest values, for any given SensorID and MeasurementTime. For this, we use a U-SQL script that looks similar to this:

@newestMeasurements_addRN =
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY PDate, 
                                           SensorId, 
                                           MeasurementTime
                              ORDER BY ReceivedTime DESC) AS MeasurementRN;

@newestMeasurements =
    SELECT SensorId,
           MeasurementTime,
           MeasurementValue
    FROM @newestMeasurements_addRN
    WHERE MeasurementRN == 1;

Here, PDate is a virtual column inferred from the yyyy/MM/dd in the path of the CSV file (equals the date-portion of MeasurementTime).

Now, since we use PDate in the PARTITION BY part of the window function, I expected that this operation could be parallelised, since we don't have to consider different days (partitions) when trying to find the newest record for any given SensorID and MeasurementTime. Unfortunately, that does not seem to be the case, looking at a job graph:

enter image description here

Here, we are extracting data from 4 different days. Each of the Extract vertices outputs the full number of records, leaving the task of identifying only the newest records to the Combine vertex at the bottom, indicating that the ROW_NUMBER and subsequent filtering does not happen in parallel.

  • Is this a bug in the implementation of ROW_NUMBER?
  • Is there a different U-SQL technique we can use to ensure parallelism?
1

1 Answers

1
votes

I managed to find a usable solution, in which I encapsulated the U-SQL that detects the latest measurements inside a U-SQL stored proc, which takes a value corresponding to pdate as input parameter.

Then, I simply execute this stored proc several times, with a list of dates that I want to process in parallel:

DetectLatestMeasurements(20170910);
DetectLatestMeasurements(20170911);
DetectLatestMeasurements(20170912);
DetectLatestMeasurements(20170913);

The stored proc handles EXTRACT, transformation and OUTPUT of one days worth of data, so this does the job, and it is parallelised the way I expect.