1
votes

I have a Pub/Sub topic + subscription and want to consume and aggregate the unbounded data from the subscription in a Dataflow. I use a fixed window and write the aggregates to BigQuery.

Reading and writing (without windowing and aggregation) works fine. But when I pipe the data into a fixed window (to count the elements in each window) the window is never triggered. And thus the aggregates are not written.

Here is my word publisher (it uses kinglear.txt from the examples as input file):

public static class AddCurrentTimestampFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
    }
}

public static class ExtractWordsFn extends DoFn<String, String> {
    @ProcessElement public void processElement(ProcessContext c) {
        String[] words = c.element().split("[^a-zA-Z']+");
        for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
    }
}

// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
        .apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
        .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
        .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();

Here is my windowed word counter:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
        .withSchema(o.getSchema())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

Window.Bound<String> w = Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)));

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
        .apply("FixedWindow", w)
        .apply("CountWords", Count.<String>perElement())
        .apply("CreateRows", ParDo.of(new WordCountToRowFn()))
        .apply("WriteRows", tablePipe);
p.run();

The above subscriber will not work, since the window does not seem to trigger using the default trigger. However, if I manually define a trigger the code works and the counts are written to BigQuery.

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .triggering(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes();

I like to avoid specifying custom triggers if possible.

Questions:

  1. Why does my solution not work with Dataflow's default trigger?
  2. How do I have to change my publisher or subscriber to trigger windows using the default trigger?
1
Do you have a job ID?jkff
Note that your trigger will "finish" after a single firing, dropping all further data. To match the default trigger, you would want Repeatedly.forever(AfterWatermark.pastEndOfWindow()) (which won't actually repeat if your allowed lateness is zero)Kenn Knowles
Just tested with this trigger. But the watermark-based stuff is not working in the described scenario. The internal Combine.GroupedValues action of the "CountWords" step is never executed; even after a leaving the pipeline running for > 10 Minutes. The GroupByKey action of "CountWord" is the last action executed. I will now investigate/try the timestampLabel approach, proposed by Ben. @jkff: the job ID is: 2017-01-04_07_33_56-14457038567248221079Juve

1 Answers

2
votes

How are you determining the trigger never fires?

Your PubSubIO.Write and PubSubIO.Read transforms should both specify a timestamp label using withTimestampLabel, otherwise the timestamps you've added will not be written to PubSub and the publish times will be used.

Either way, the input watermark of the pipeline will be derived from the timestamps of the elements waiting in PubSub. Once all inputs have been processed, it will stay back for a few minutes (in case there was a delay in the publisher) before advancing to real time.

What you are likely seeing is that all the elements are published in the same ~1 second window (since the input file is pretty small). These are all read and processed relatively quickly, but the 1-second window they are put in will not trigger until after the input watermark has advanced, indicating that all data in that 1-second window has been consumed.

This won't happen until several minutes, which may make it look like the trigger isn't working. The trigger you wrote fired after 1 second of processing time, which would fire much earlier, but there is no guarantee all the data has been processed.

Steps to get better behavior from the default trigger:

  1. Use withTimestampLabel on both the write and read pubsub steps.
  2. Have the publisher spread the timestamps out further (eg., run for several minutes and spread the timestamps out across that range)