3
votes

I am building out an Apache Beam (v2.0) pipeline, to be run in Google Cloud Dataflow. The intended flow is:

  • Events stream (unbounded data source) in from Pub/Sub. They are simple JSON objects, with a sessionId property.
  • Use a custom DoFn to events into KV<String, String> where they key is sessionId and the value is the whole JSON object.
  • Window events using Session windows (gap duration of 2 seconds for development, will be ~30 mins in production).
  • For now, just print the result that is emitted from each window

Here is the pipeline code:

   Pipeline pipeline = Pipeline.create(options);

    pipeline.apply(PubsubIO
                    .readStrings()
                    .fromSubscription(options.getSubscription()))

        .apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                Gson gson = new Gson();
                String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId");
                KV<String, String> kv = KV.of(key, c.element());
                c.output(kv);
            }
          }))


        .apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2))))

        .apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                System.out.println("****");
                System.out.println(c.element());
                System.out.println(c.timestamp());
            }
          }));

        return pipeline.run();

I would like the Window function to emit results every time a session ends, for each session (based on the key). For testing purposes I am using the pub/sub emulator and just sending data over random intervals.

So, for instance, if the following data was sent to pub/sub:

{"sessionId": "2", "data": "data9583", "timestamp": 1507293681}
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683}
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684}
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684}
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685}
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686}
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687}

The Window function should emit the following:

  • 1st window: contains event with sessionId=2
  • 2nd window: contains 2 events with sessionId=3
  • 3rd window: contains 3 events with sessionId=6
  • 4th window: contains 1 event with sessionId=4

The idea here is that:

  • Windows will only emit once the session is "complete", which is to say {gapDuration} has passed since the last event with that sessionId
  • Each window will contain events from a single session (because we have passed KV<String, String> into the Window function)

The window function above is pulled directly from Beam documentation.

What I am actually seeing is:

  • Every event gets printed immediately upon hitting pub/sub, so the pipeline isn't even waiting for {gapDuration} to emit windows
  • Each print statement contains a single event

Worth noting is that if I add a custom CombineFn (which simply turns the JSON objects into an array of JSON objects), nothing makes it to the CombineFn nor to the PrintFn (I added a print statement within the CombineFn).

I am assuming that triggering has something to do with this, but can't seem to find anything useful to set me in the right direction (there is a surprisingly small amount of example code out there for Beam, especially for v2.0.

My questions:

  • Is my desired behaviour possible?
  • If so, what am I missing? Is this approach at least on the right track?
  • If anyone can point me to a good source of example code for a variety of Beam pipeline use cases, that would be great!

Resources I've scoured with no success:

1

1 Answers

3
votes

First of all, window functions that require merging elements between windows requires applying an aggregation operation, such as GroupByKey or Combine. This is discussed in the Beam programming guide under Windowing Basics.

Second of all, PubSub by default (as you are using) will assign timestamps to elements based on when they are published. Since you have an explicit timestamp field, you may want to look into publishing these elements with a timestamp attribute, and reading them using the withTimestampAttribute method. This will use the timestamp attribute you publish with as the timestamp.