I am building out an Apache Beam (v2.0) pipeline, to be run in Google Cloud Dataflow. The intended flow is:
- Events stream (unbounded data source) in from Pub/Sub. They are simple JSON objects, with a
sessionId
property. - Use a custom DoFn to events into
KV<String, String>
where they key issessionId
and the value is the whole JSON object. - Window events using Session windows (gap duration of 2 seconds for development, will be ~30 mins in production).
- For now, just print the result that is emitted from each window
Here is the pipeline code:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()))
.apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId");
KV<String, String> kv = KV.of(key, c.element());
c.output(kv);
}
}))
.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2))))
.apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("****");
System.out.println(c.element());
System.out.println(c.timestamp());
}
}));
return pipeline.run();
I would like the Window function to emit results every time a session ends, for each session (based on the key). For testing purposes I am using the pub/sub emulator and just sending data over random intervals.
So, for instance, if the following data was sent to pub/sub:
{"sessionId": "2", "data": "data9583", "timestamp": 1507293681}
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683}
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684}
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684}
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685}
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686}
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687}
The Window function should emit the following:
- 1st window: contains event with
sessionId=2
- 2nd window: contains 2 events with
sessionId=3
- 3rd window: contains 3 events with
sessionId=6
- 4th window: contains 1 event with
sessionId=4
The idea here is that:
- Windows will only emit once the session is "complete", which is to say
{gapDuration}
has passed since the last event with that sessionId - Each window will contain events from a single session (because we have passed
KV<String, String>
into the Window function)
The window function above is pulled directly from Beam documentation.
What I am actually seeing is:
- Every event gets printed immediately upon hitting pub/sub, so the pipeline isn't even waiting for
{gapDuration}
to emit windows - Each print statement contains a single event
Worth noting is that if I add a custom CombineFn
(which simply turns the JSON objects into an array of JSON objects), nothing makes it to the CombineFn
nor to the PrintFn
(I added a print statement within the CombineFn
).
I am assuming that triggering has something to do with this, but can't seem to find anything useful to set me in the right direction (there is a surprisingly small amount of example code out there for Beam, especially for v2.0.
My questions:
- Is my desired behaviour possible?
- If so, what am I missing? Is this approach at least on the right track?
- If anyone can point me to a good source of example code for a variety of Beam pipeline use cases, that would be great!
Resources I've scoured with no success:
- The World Beyond Batch: Streaming 101 & 102
- "Complete" Examples from Beam Github
- Beam JavaDoc