I am writing a pipeline that is processing product events (create, update, delete). Each product belongs to a sale that has a certain duration. I want to be able to perform some aggregation on all the products in a given sale. For the purpose of this example, let's assume I just want a list of unique product IDs per sale.
Therefore, my pipeline is using session windows on the sale id with a very long gap duration (so when the sale closes and there are no more product updates being published, the window for that sale closes too). My question is, how do I write a unit test for that?
For the sake of this test, let's assume the following:
- the events are just Strings with the sale ID and the product ID, separated by a space,
- the
applyDistinctProductsTransform
will basically perform what I've said above. CreateKV<String, String>
elements where the key is the sale id; set session windows with a gap duration of 600 seconds; and finally create a concatenated string of all product IDs per sale.
Here is what I have so far:
I create a TestStream
and add some elements: 3 products for sale1
. Next, I advance the watermark to 700, well beyond the gap duration. Another product is added and finally the watermark is advanced to infinity.
@Test
public void TestSessionWindow() {
Coder<String> utfCoder = StringUtf8Coder.of();
TestStream<String> onTimeProducts =
TestStream.create(utfCoder).addElements(
TimestampedValue.of("sale1 product1", new Instant(0)),
TimestampedValue.of("sale1 product2", new Instant(0)),
TimestampedValue.of("sale1 product3", new Instant(0))
)
.advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
.addElements(
TimestampedValue.of("campaign1 product9", new Instant(710))
)
.advanceWatermarkToInfinity();
PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);
PAssert.that(results).containsInAnyOrder(
KV.of("sale1", "product1,product2,product3"),
KV.of("sale1", "product9")
);
pipeline.run().waitUntilFinish();
}
However,
- the pipeline outputs a KV of
sale1
,product1,product2,product3,product9
soproduct9
is appended to the window. I would've expected this product to be processed in a separate window and hence end up in a different row in the output PCollection. - how can I only get the results of a single window in the PAssert? I know there is the
inWindow
function and I've found an example for a fixed time window but I don't know how to do the same for a session window.
You can check out the full code of the PTransform and the unit test.