4
votes

I have an Apache Beam pipeline which runs on Google Cloud Dataflow. This a streaming pipeline which receives input messages from Google Cloud PubSub which are basically JSON arrays of elements to process.

Roughly speaking, the pipeline has these steps:

  1. Deserializes the message into a PCollecttion<List<T>>.
  2. Splits (or explodes) the array into a PCollection<T>.
  3. Few processing steps: some elements will finish before other elements and some elements are cached so they simply skip to the end without much processing at all.
  4. Flatten all outputs and apply a GroupByKey(this is the problem step): it transforms the PCollection back into a Pcollection<List<T>> but it doesn't wait for all the elements.
  5. Serialize to publish a PubSub Message.

I cannot get the last GroupByKey to group all elements that where received together. The published message doesn't contain the elements that had to be processed and took longer than those which skipped to the end.

I think this would be straight forward to solve if I could write a custom Data-Driven trigger. Or even if I could dynamically set the trigger AfterPane.elementCountAtLeast() from a customized WindowFn.

It doesn't seem that I can make a custom trigger. But is it possible to somehow dynamically set the trigger for each window?

--

Here is a simplified version of the pipeline I am working on.

I have simplified the input from an array of objects T into a simple array of Integer. I have simulated the keys (or IDs) for these integers. Normally they would be part of the objects.

I also simplified the slow processing step (which really is several steps) into a sigle step with an artificial delay.

(complete example gist https://gist.github.com/naringas/bfc25bcf8e7aca69f74de719d75525f2 )

PCollection<String> queue = pipeline
    .apply("ReadQueue", PubsubIO.readStrings().fromTopic(topic))
    .apply(Window
        .<String>into(FixedWindows.of(Duration.standardSeconds(1)))
        .withAllowedLateness(Duration.standardSeconds(3))
        .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
        .discardingFiredPanes());

TupleTag<List<KV<Integer, Integer>>> tagDeserialized = new TupleTag<List<KV<Integer, Integer>>>() {};
TupleTag<Integer> tagDeserializeError = new TupleTag<Integer>() {};
PCollectionTuple imagesInputTuple = queue
    .apply("DeserializeJSON", ParDo.of(new DeserializingFn()).withOutputTags(tagDeserialized, TupleTagList.of(tagDeserializeError)));

/*  
This is where I think that I must adjust the custom window strategy, set the customized dynamic-trigger
*/
PCollection<KV<Integer, Integer>> images = imagesInputTuple.get(tagDeserialized)
    /* I have tried many things
    .apply(Window.<List<KV<Integer, Integer>>>into(new GlobalWindows()))
    */
    .apply("Flatten into timestamp", ParDo.of(new DoFn<List<KV<Integer, Integer>>, KV<Integer, Integer>>() {
        // Flatten and output into same ts
        // like Flatten.Iterables() but I set the output window
        @ProcessElement
        public void processElement(@Element List<KV<Integer, Integer>> input, OutputReceiver<KV<Integer, Integer>> out, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
            Instant timestamp = w.maxTimestamp();
            for (KV<Integer, Integer> el : input) {
                out.outputWithTimestamp(el, timestamp);
            }
        }
    }))
    .apply(Window.<KV<Integer, Integer>>into(new GlobalWindows()));

TupleTag<KV<Integer, Integer>> tagProcess = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagSkip = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple preproc = images
    .apply("PreProcessingStep", ParDo.of(new SkipOrNotDoFn()).withOutputTags(tagProcess, TupleTagList.of(tagSkip)));

TupleTag<KV<Integer, Integer>> tagProcessed = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagError = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple processed = preproc.get(tagProcess)
    .apply("ProcessingStep", ParDo.of(new DummyDelasyDoFn).withOutputTags(tagProcessed, TupleTagList.of(tagError)));

/* Here, at the "end"
the elements get grouped back
first: join into a PcollectionList and flatten it
second: GroupByKey which should but doesn't way for all elements
lastly: serilize and publish (in this case just print out)
*/
PCollection end = PCollectionList.of(preproc.get(tagSkip)).and(processed.get(tagProcessed))
    .apply("FlattenUpsert", Flatten.pCollections())
    //
    .apply("GroupByParentId", GroupByKey.create())
    .apply("GroupedValues", Values.create())
    .apply("PublishSerialize", ParDo.of(
        new DoFn<Object, String>() {
            @ProcessElement
            public void processElement(ProcessContext pc) {
                String output = GSON.toJson(pc.element());
                LOG.info("DONE: {}", output);
                pc.output(output);
            }
        }));
// "send the string to pubsub" goes here
1
Can you look at adding an artificial key to a batch of records and then group by that in which case the group by key will wait for all the events with that id.Jayadeep Jayaraman
the GroupByKey already uses an id (defined by the input data).naringas
Can you share some sample input and output data and the problem you are seeing ?Jayadeep Jayaraman
To summarize, you want in your rebuilt Pcollection<List<T>> (step 4) the same number of element that the first one, before the split (step 1)?guillaume blaquiere
yes @guillaumeblaquierenaringas

1 Answers

4
votes

I played around a little bit with stateful pipelines. As you'd like to use data-driven triggers or AfterPane.elementCountAtLeast() I assume you know the number of elements that conform the message (or, at least, it does not change per key) so I defined NUM_ELEMENTS = 10 in my case.

The main idea of my approach is to keep track of the number of elements that I have seen so far for a particular key. Notice that I had to merge the PreProcessingStep and ProcessingStep into a single one for an accurate count. I understand this is just a simplified example so I don't know how that would translate to the real scenario.

In the stateful ParDo I defined two state variables, one BagState with all integers seen and a ValueState to count the number of errors:

// A state bag holding all elements seen for that key
@StateId("elements_seen")
private final StateSpec<BagState<Integer>> elementSpec =
      StateSpecs.bag();

// A state cell holding error count
@StateId("errors")
private final StateSpec<ValueState<Integer>> errorSpec =
      StateSpecs.value(VarIntCoder.of());

Then we process each element as usual but we don't output anything yet unless it's an error. In that case we update the error counter before emitting the element to the tagError side output:

errors.write(firstNonNull(errors.read(), 0) + 1);
is_error = true;
output.get(tagError).output(input);

We update the count and, for successfully processed or skipped elements (i.e. !is_error), write the new observed element into the BagState:

int count = firstNonNull(Iterables.size(state.read()), 0) + firstNonNull(errors.read(), 0);

if (!is_error) {
   state.add(input.getValue());
   count += 1;
}

Then, if the sum of successfully processed elements and errors is equal to NUM_ELEMENTS (we are simulating a data-driven trigger here), we flush all the items from the BagState:

if (count >= NUM_ELEMENTS) {
   Iterable<Integer> all_elements = state.read();
   Integer key = input.getKey();

   for (Integer value : all_elements) {
      output.get(tagProcessed).output(KV.of(key, value));
   }
}

Note that here we can already group the values and emit just a single KV<Integer, Iterable<Integer>> instead. I just made a for loop instead to avoid changing other steps downstream.

With this, I publish a message such as:

gcloud pubsub topics publish streamdemo --message "[1,2,3,4,5,6,7,8,9,10]"

And where before I got:

INFO: DONE: [4,8]

Now I get:

INFO: DONE: [1,2,3,4,5,6,8,9,10]

Element 7 is not present as is the one that simulates errors.

Tested with DirectRunner and 2.16.0 SDK. Full code here.

Let me know if that works for your use case, keep in mind that I only did some minor tests.