0
votes

How does one create tuple-based sliding windows in Apache Beam? This is very easy to do in Flink:

DataStream.countWindowAll(long size, long slide)

But it is unclear from the docs for Beam (or DataFlow) how to do this. Is it some combination of windows and triggers? Is it efficient?

1

1 Answers

0
votes

Sliding windows are natively supported in Beam. Please see the programming guide and documentation for the SlidingWindows class.

E.g.:

PCollection<Foo> foos = ...;
PCollection<Integer> counts = foos
    .apply(Window.into(
        SlidingWindows.of(Duration.standardMinutes(5))
                      .every(Duration.standardMinutes(1))))
    // Below is required instead of Count.globally() when you use
    // a non-global windowing function.
    .apply(Combine.globally(Count.<Foo>combineFn()).withoutDefaults());
PCollection<String> formattedCounts = counts.apply(
        ParDo.of(new DoFn<Integer, String>() {
          @ProcessElement
          public void process(ProcessContext c, BoundedWindow w) {
            c.output("Window: " + w + ", count: " + c.element());
          }
        }));

Triggering is a separate dimension of the problem, and it controls when the data for a particular window will be considered "complete enough" to apply the aggregation. See programming guide.