Background
We have a pipeline that starts by receiving messages from PubSub, each with the name of a file. These files are exploded to line level, parsed to JSON object nodes and then sent to an external decoding service (which decodes some encoded data). Object nodes are eventually converted to Table Rows and written to Big Query.
It appeared that Dataflow was not acknowledging the PubSub messages until they arrived at the decoding service. The decoding service is slow, resulting in a backlog when many message are sent at once. This means that lines associated with a PubSub message can take some time to arrive at the decoding service. As a result, PubSub was receiving no acknowledgement and resending the message. My first attempt to remedy this was adding an attribute to each PubSub messages that is passed to the Reader using withAttributeId(). However, on testing, this only prevented duplicates that arrived close together.
My second attempt was to add a fusion breaker (example) after the PubSub read. This simply performs a needless GroupByKey and then ungroups, the idea being that the GroupByKey forces Dataflow to acknowledge the PubSub message.
The Problem
The fusion breaker discussed above works in that it prevents PubSub from resending messages, but I am finding that this GroupByKey outputs more elements than it receives: See image.
To try and diagnose this I have removed parts of the pipeline to get a simple pipeline that still exhibits this behavior. The behavior remains even when
- PubSub is replaced by some dummy transforms that send out a fixed list of messages with a slight delay between each one.
- The Writing transforms are removed.
- All Side Inputs/Outputs are removed.
The behavior I have observed is:
- Some number of the received messages pass straight through the GroupByKey.
- After a certain point, messages are 'held' by the GroupByKey (presumably due to the backlog after the GroupByKey).
- These messages eventually exit the GroupByKey (in groups of size one).
- After a short delay (about 3 minutes), the same messages exit the GroupByKey again (still in groups of size one). This may happen several times (I suspect it is proportional to the time they spend waiting to enter the GroupByKey).
Example job id is 2017-10-11_03_50_42-6097948956276262224. I have not run the beam on any other runner.
The Fusion Breaker is below:
@Slf4j
public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in")))))
.apply("Getting iterables after breaking fusion", Values.create())
.apply("Flattening iterables after breaking fusion", Flatten.iterables())
.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out")));
}
private PCollection<T> window(PCollection<T> input) {
return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
}
private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) {
return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>()))
.apply("Grouping by key to break fusion", GroupByKey.create());
}
private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private Random random;
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(KV.of(random.nextInt(), context.element()));
}
}
}
The PassthroughLoggers simply log the elements passing through (I use these to confirm that elements are indeed repeated, rather than there being an issue with the counts).
I suspect this is something to do with windows/triggers, but my understanding is that elements should never be repeated when .discardingFiredPanes() is used - regardless of the windowing setup. I have also tried FixedWindows with no success.