0
votes

This is in reference to Apache Beam SDK Version 2.2.0.

I'm attempting to use AfterPane.elementCountAtLeast(...) but not having any success so far. What I want looks a lot like Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn, but needs to be adapted to 2.2.0. Ultimately I just need a simple OR where a file is written after X elements OR Y time has passed. I intend to set the time very high so that the write happens on the number of elements in the majority of cases, and only writes based on duration during times of very low message volume.

Using GCP Dataflow 2.0 PubSub to GCS as a reference here's what I've tried:

String bucketPath =
    String.format("gs://%s/%s", 
        options.getBucketName(), 
        options.getDestinationDirName());

PCollection<String> windowedValues = stringMessages
    .apply("Create windows",
        Window.<String>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(250)))
        .discardingFiredPanes());

windowedValues
    .apply("Write to GCS",
        TextIO
            .write()
            .to(bucketPath)
            .withNumShards(options.getNumShards())
            .withWindowedWrites());

Where stringMessages is a PCollection that is reading from an Avro-encoded pubsub subscription. There is some unpacking happening upstream to get the events converted to strings, but no merging/partitioning/grouping, just transforms.

Element count is hard coded at 250 just for PoC. Once it is proven, it will likely be cranked up to the 10s or 100s of thousands range.

The Problem

This implementation has resulted in text files of various lengths. The files lengths start very high (1000s of elements) when the job first starts up (presumably processing backlogged data, and then stabilize at some point. I've tried altering the 'numShards' to 1 and 10. At 1, the element count of the written files stabilizes at 600, and with 10, it stabilizes at 300.

What am I missing here?

As a side note, this is only step 1. Once I figure out writing using element count, I still need to figure out writing these files as compressed json (.json.gz) as opposed to plain-text files.

1
Currently, data-driven triggers such as AfterPane.elementCountAtLeast() only support firing after a certain number of data elements. Firing exactly upon a certain element count is not supported.George
This is why determining firing time with elementCountAtLeast does not act with the expected precision. You may consider using Processing time triggers, and adapting duration depending on the desired file length. More detail can be found in the "Apache Beam Programming Guide": beam.apache.org/documentation/programming-guide/#triggers .George
Thank you for the response George. It is good to know that this is not supported currently. Unfortunately, our unbounded event stream varies quite a bit based on the time of day, so using a fixed Processing time trigger provides a variable file size. Is there any more information available about how AfterPane.elementCountAtLeast() determines when to fire a pane?ljhennessy
Related information is to be found on referred the apache documentation page. By definition, elementCountAtLeast() is limited on one side, but by design not limited in the opposite direction. Documentation cannot alter this definition. Unfortunately, the functionality offered by elementCountAtLeast() does not match your needs, in this context.George

1 Answers

2
votes

Posting what I learned for reference by others.

What was not clear to me when I wrote this is the following from the Apache Beam Documentation:

Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis

With this knowledge, I rethought my pipeline a bit. From the FileIO documentation under Writing files -> How many shards are generated per pane:

Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey to the pipeline. However, it is required to set it when writing an unbounded PCollection due to BEAM-1438 and similar behavior in other runners.

So I decided to use FileIO's writeDynamic to perform the writes and specify withNumShards in order to get the implicit GroupByKey. The final result looks like this:

PCollection<String> windowedValues = validMessageStream.apply(Window
            .<String>configure()
            .triggering(Repeatedly.forever(AfterFirst.of(
                    AfterPane.elementCountAtLeast(2000),
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                            Duration.standardSeconds(windowDurationSeconds)))))
            .discardingFiredPanes());

windowedValues.apply(FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://data_pipeline_events_test/events/")
            .withDestinationCoder(StringUtf8Coder.of())
            .withNumShards(1)
            .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));