3
votes

I am writing a pipeline that is processing product events (create, update, delete). Each product belongs to a sale that has a certain duration. I want to be able to perform some aggregation on all the products in a given sale. For the purpose of this example, let's assume I just want a list of unique product IDs per sale.

Therefore, my pipeline is using session windows on the sale id with a very long gap duration (so when the sale closes and there are no more product updates being published, the window for that sale closes too). My question is, how do I write a unit test for that?

For the sake of this test, let's assume the following:

  • the events are just Strings with the sale ID and the product ID, separated by a space,
  • the applyDistinctProductsTransform will basically perform what I've said above. Create KV<String, String> elements where the key is the sale id; set session windows with a gap duration of 600 seconds; and finally create a concatenated string of all product IDs per sale.

Here is what I have so far:

I create a TestStream and add some elements: 3 products for sale1. Next, I advance the watermark to 700, well beyond the gap duration. Another product is added and finally the watermark is advanced to infinity.

@Test
public void TestSessionWindow() {
    Coder<String> utfCoder = StringUtf8Coder.of();
    TestStream<String> onTimeProducts = 
TestStream.create(utfCoder).addElements(
            TimestampedValue.of("sale1 product1", new Instant(0)),
            TimestampedValue.of("sale1 product2", new Instant(0)),
            TimestampedValue.of("sale1 product3", new Instant(0))
    )
            .advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
    .addElements(
            TimestampedValue.of("campaign1 product9", new Instant(710))
    )
    .advanceWatermarkToInfinity();

    PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);

    PAssert.that(results).containsInAnyOrder(
            KV.of("sale1", "product1,product2,product3"),
            KV.of("sale1", "product9")
    );
    pipeline.run().waitUntilFinish();
}

However,

  1. the pipeline outputs a KV of sale1, product1,product2,product3,product9 so product9 is appended to the window. I would've expected this product to be processed in a separate window and hence end up in a different row in the output PCollection.
  2. how can I only get the results of a single window in the PAssert? I know there is the inWindow function and I've found an example for a fixed time window but I don't know how to do the same for a session window.

You can check out the full code of the PTransform and the unit test.

1

1 Answers

2
votes

1) I believe you have a simple unit issue. The window gap duration of 600 is being specified in seconds Duration.standardSeconds yet new Instant(long) uses milliseconds which means that the 600 second gap is larger then the time interval of 700 millis causing the sessions to be merged.

2) Sessions still use interval windows internally. You will need to compute what the output window would be after all sessions are merged based upon your trigger strategy. By default, a session window uses the IntervalWindow(timestamp, gap duration), and merges all overlapping windows to create a larger window. For example, if you had the windows (start time, end time), [10, 14], [12, 18], [4, 14] for the same session key, they would all be merged producing a single [4, 18] window.