0
votes

Working with a streaming, unbounded PCollection in Google Dataflow that originates from a Cloud PubSub subscription. We are using this as a firehose to simply deliver events to BigTable continuously. Everything with the delivery is performing nicely.

Our problem is that we have downstream batch jobs that expect to read a day's worth of data out of BigTable once it is delivered. I would like to utilize windowing and triggering to implement a side effect that will write a marker row out to bigtable when the watermark advances beyond the day threshold, indicating that dataflow has reason to believe that most of the events have been delivered (we don't need strong guarantees on completeness, just reasonable ones) and that downstream processing can begin.

What we've tried is write out the raw events as one sink in the pipeline, and then window into another sink, using the timing information in the pane to determine if the watermark has advanced. The problem with this approach is that it operates upon the raw events themselves again, which is undesirable since it would repeat writing the event rows. We can prevent this write, but the parallel path in the pipeline would still be operating over the windowed streams of events.

Is there an effecient way to attach a callback-of-sorts to the watermark, such that we can perform a single action when the watermark advances?

1

1 Answers

2
votes

The general ability to set a timer in event time and receive a callback is definitely an important feature request, filed as BEAM-27, which is under active development.

But actually your approach of windowing into FixedWindows.of(Duration.standardDays(1)) seems like it will accomplish your goal using just the features of the Dataflow Java SDK 1.x. Instead of forking your pipeline, you can maintain the "firehose" behavior by adding the trigger AfterPane.elementCountAtLeast(1). It does incur the cost of a GroupByKey but does not duplicate anything.

The complete pipeline might look like this:

pipeline
    // Read your data from Cloud Pubsub and parse to MyValue
    .apply(PubsubIO.Read.topic(...).withCoder(MyValueCoder.of())

    // You'll need some keys
    .apply(WithKeys.<MyKey, MyValue>of(...))

    // Window into daily windows, but still output as fast as possible
    .apply(Window.into(FixedWindows.of(Duration.standardDays(1)))
                 .triggering(AfterPane.elementCountAtLeast(1)))

    // GroupByKey adds the necessary EARLY / ON_TIME / LATE labeling
    .apply(GroupByKey.<MyKey, MyValue>create())

    // Convert KV<MyKey, Iterable<MyValue>>
    // to KV<ByteString, Iterable<Mutation>>
    // where the iterable of mutations has the "end of day" marker if
    // it was ON_TIME
    .apply(MapElements.via(new MessageToMutationWithEndOfWindow())

    // Write it!
    .apply(BigTableIO.Write.to(...);

Please do comment on my answer if I have missed some detail of your use case.