0
votes

I'm trying to use Apache Beam (via Scio) to run a continuous aggregation of the last 3 days of data (processing time) from a streaming source and output results from the earliest, active window every 5 minutes. Earliest meaning the window with the earliest start time, active meaning that the end of the window hasn't yet passed. Essentially I'm trying to get a 'rolling' aggregation by dropping the non-overlapping period between sliding windows.

A visualization of what I'm trying to accomplish with an example sliding window of size 3 days and period 1 day:

early firing -  ^       no firing - x
                |

                               ** stop firing from this window once time passes this point
          ^  ^  ^  ^  ^  ^  ^  ^      
          |  |  |  |  |  |  |  |      ** stop firing from this window once time passes this point
w1:       +====================+^  ^  ^
                  x x x x x x x |  |  |
w2:              +====================+^  ^  ^
                         x x x x x x x |  |  |
w3:                     +====================+

time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->

I've tried using sliding windows (size=3 days, period=5 min), but they produce a new window for every 3 days/5 min combination in the future and are emitting early results for every window. I tried using trigger = AfterWatermark.pastEndOfWindow(), but I need early results when the job first starts. I've tried comparing the pane data (isLast, timestamp, etc.) between windows but they seem identical.

My most recent attempt, which seems somewhat of a hack, included attaching window information to each key in a DoFn, re-windowing into a fixed window, and attempting to group and reduce to the oldest window from the attached data, but the final reduceByKey doesn't seem to output anything.

DoFn to attach window information

// ValueType is just a case class I'm using for objects

type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]

class Test extends DoFnT {
  // Window.toString looks like the following:
  // [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
  def parseWindow(window: String): Instant = {
    Instant.parse(
      window
        .stripPrefix("[")
        .stripSuffix(")")
        .split("\\.\\.")(1))
  }

  @ProcessElement
  def process(
        context: DoFnT#ProcessContext,
        window: BoundedWindow): Unit = {
    context.output(
      KV.of(
        context.element().getKey,
        (context.element().getValue, parseWindow(window.toString))
      )
    )
  }
}
sc
  .pubsubSubscription(...)
  .keyBy(_.key)
  .withSlidingWindows(
    size = Duration.standardDays(3),
    period = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      allowedLateness = Duration.ZERO,
      trigger = Repeatedly.forever(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(
            AfterProcessingTime
              .pastFirstElementInPane()
              .plusDelayOf(Duration.standardMinutes(1)))))))
  .reduceByKey(ValueType.combineFunction())
  .applyPerKeyDoFn(new Test())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO))
  .reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
  .saveAsCustomOutput(
    TextIO.write()...
  )

Any suggestions?

1
Are you setting early firings for your trigger (withEarlyFirings()) ? Can you mention your pipeline or a sketch of your pipeline with the question ?chamikara
How often will your windows be created? By your draft I understand that you will have one window starting per day. Is it right?rmesteves
They should be created every 5 minutes and span 3 days, though I'm looking for just a generic sliding window solution. Apologies, I just updated the comment above the draft picture to explain it was for the concept with a sample sliding window size of 3 days with periods of 1 day.iralls
Beam already has support for sliding windows. Can you try using that ? github.com/apache/beam/blob/master/sdks/java/core/src/main/java/…chamikara
As I mentioned above, sliding windows in Beam emit a separate window for every size/period, and output this data to the rest of the pipeline. I'm trying to output from only a specific sliding window within that group, which doesn't seem out-of-the-box supported.iralls

1 Answers

0
votes

First, regarding processing time: If you want to window according to processing time, you should set your event time to the processing time. This is perfectly fine - it means that the event you are processing is the event of ingesting the record, not the event that the record represents.

Now you can use sliding windows off-the-shelf to get the aggregation you want, grouped the way you want.

But you are correct that it is a bit of a headache to trigger the way you want. Triggers are not easily expressive enough to say "output the last 3 day aggregation but only begin when the window is 5 minutes from over" and even less able to express "for the first 3 day period from pipeline startup, output the whole time".

I believe a stateful ParDo(DoFn) will be your best choice. State is partitioned per key and window. Since you want to have interactions across 3 day aggregations you will need to run your DoFn in the global window and manage the partitioning of the aggregations yourself. You tagged your question google-cloud-dataflow and Dataflow does not support MapState so you will need to use a ValueState that holds a map of the active 3 day aggregations, starting new aggregations as needed and removing old ones when they are done. Separately, you can easily track the aggregation from which you want to periodically output, and have a timer callback that periodically emits the active aggregation. Something like the following pseudo-Java; you can translate to Scala and insert your own types:

DoFn<> {
  @StateId("activePeriod") StateSpec<ValueState<Period>> activePeriod = StateSpecs.value();

  @StateId("accumulators") StateSpec<ValueState<Map<Period, Accumulator>>> accumulators = StateSpecs.value();

  @TimerId("nextPeriod") TimerSpec nextPeriod = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @TimerId("output") TimerSpec outputTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);


  @ProcessElement
  public void process(
      @Element element,
      @TimerId("nextPeriod") Timer nextPeriod,
      @TimerId("output") Timer output,
      @StateId("activePeriod") ValueState<Period> activePeriod
      @StateId("accumulators") ValueState<Map<Period, Accumulator>> accumulators) {

    // Set nextPeriod if it isn't already running
    // Set output if it isn't already running
    // Set activePeriod if it isn't already set
    // Add the element to the appropriate accumulator
  }

  @OnTimer("nextPeriod")
  public void onNextPeriod(
      @TimerId("nextPeriod") Timer nextPeriod,
      @StateId("activePriod") ValueState<Period> activePeriod {

    // Set activePeriod to the next one
    // Clear the period we will never read again
    // Reset the timer (there's a one-time change in this logic after the first window; add a flag for this)
  }

  @OnTimer("output")
  public void onOutput(
      @TimerId("output") Timer output,
      @StateId("activePriod") ValueState<Period> activePeriod,
      @StateId("accumulators") ValueState<MapState<Period, Accumulator>> {

    // Output the current accumulator for the active period
    // Reset the timer
  }
}

I do have some reservations about this, because the outputs we are working so hard to suppress are not comparable to the outputs that are "replacing" them. I would be interesting in learning more about the use case. It is possible there is a more straightforward way to express the result you are interested in.