2
votes

I am reading records from a PubSub topic in a Dataflow Pipeline. PubSub records are divided into Fixed windows and then grouped on Each window. Each window is sorted on the Sequence number as we need to process these records in order using beam.SortValues. I then write the records to Cloud BigTable

The issue with the pipeline is the Data Freshness and System Lag. The Data freshness seems to be stuck at a point and the Watermark stops advancing.

I am using the following windowing strategy to emit records after the GroupByKey step:

PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
                .apply("Window", Window
                        .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10)))
                        .triggering(Repeatedly.forever(AfterFirst.of(
      AfterPane.elementCountAtLeast(500),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
                        .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
                    );

I think the issue might be with the windowing strategy. Basically I want to do the following: Read records from PubSub into FixedWindows of 1 minute, Sort the Window and Write to BigTable. If i use the defualt trigger, the GroupByKey step doesn't emit any results. Can someone help me with this?

1

1 Answers

0
votes

Reading your code, right now it looks like your early trigger and window size are backward. Your windowing strategy is actually:

  1. 10 second event time fixed window
  2. A composite early trigger of either 1 minute of processing time or 500 elements in the pane.
  3. Late events are discarded.

If you just want 1 minute event time windows, here's what you need:

PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
            .apply("Window", Window
            .<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardMinutes(1)))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes()
                .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS));

Fire always is the default OnTimeBehavior, but we can make it explicit for readability. If you need the composite trigger, you can add it back in - I suspect you wanted to trigger one 10 seconds or 500 elements.