1
votes

I am looking for a way to do sql like lead/lag function in Google Dataflow/Beam. In my case if done in sql, it would be something like

lead(balance, 1) over(partition by orderId order by order_Date)

In Beam, we parse the input text file and create a class Client_Orders to hold the data. For simplicity, let's say we orderId, order_Date and balance members in this class. And we create partitions with the orderId by constructing KV in PCollections

PCollection <KV<String, Iterable<Client_Orders>>> mainCollection = pipeline.apply(TextIO.Read.named("Reading input file")
.from(options.getInputFilePath()))
.apply(ParDo.named("Extracting client order terms from file") // to produce Client_Orders object
    .apply('create KV...", GroupByKey.<String, Client_Orders>create());

In Beam, I know we can do windowing, but that requires in general to set a window size in terms of duration Windows.of(Duration.standardDays(n)), but that doesn't seem to help in this case, should I iterate through the PCollection using order_Date ?

1

1 Answers

1
votes

If your data is too large per-key to sort in memory, you will want the Beam "sorter" extension.

I will explain:

In Beam (hence Dataflow) the elements of a PCollection are unordered. This supports the unified programming model whereby the same data yields the same output whether it arrives as a real-time stream or is read from stored files. It also supports isolated failure recovery, provides robustness to network delays, etc.

In many years of massive-scale data processing, almost all uses of global order have turned out to be non-useful, in part because anyone who needs scalability finds a different way to achieve their goals). And even if global ordering exists, processing does not occur in order (because it is parallel) so global ordering would be lost almost immediately. So global ordering is not on the roadmap.

The kind of ordering you need, though, is per key. This is common and useful and often known as "value sorting". When a GroupByKey operation yields the grouped values for a key (an element of type KV<K, Iterable<V>>) there is often a benefit to a user-defined order for the values. Since it is sorting within a single element the order is preserved as the element travels through your pipeline. And it is not necessarily prohibitively expensive to sort the values - often the very same operation that groups by key can be leveraged to also sort the values as they are being grouped. This is on the Beam roadmap, but not yet part of the Beam model.

So, for now, there is the above Java-based extension that can sort the values for you.