We found a very strange difference between Dataflow SDK 1.9 and 2.0/2.1 for a very simple pipeline.
We have CoGroupByKey step that joins two PCollections by their keys and outputs two PCollections (via TupleTags). For instance, one PCollection may contain {"str1", "str2"} and the other may contains {"str3"}.
These two PCollections are written to GCS (at different locations), and their union (basically, the PCollection produced by applying Flatten on the two PCollections) would be used by subsequent steps in a pipeline. Using the previous example, we will store {"str1", "str2"} and {"str3"} in GCS under respective locations, and the pipeline will further transform their union (Flattened PCollection) {"str1", "str2", "str3"}, and so on.
In Dataflow SDK 1.9, that is exactly what is happening, and we've built our pipelines around this logic. As we were slowly migrating to 2.0/2.1, we noticed that this behavior is no longer observed. Instead, all the steps followed by the Flatten step are run correctly and as expected, but those two PCollections (being Flattened) are no longer written to GCS as if they are nonexistent. In the execution graph though, the steps are shown, and this is very strange to us.
We were able to reproduce this issue reliably so that we can share the data and code as an example. We have two text files stored in GCS:
data1.txt:
k1,v1
k2,v2
data2.txt:
k2,w2
k3,w3
We will read these two files to create two PCollections, a PC for each file.
We'll parse each line to create KV<String, String>
(so the keys are k1, k2, k3
in this example).
We then apply CoGroupByKey and produce PCollection to be output to GCS. Two PCollections are produced after the CoGroupByKey step depending on the number of values associated with each key (it's a contrived example, but it is to demonstrate the issue we are experiencing) -- whether the number is even or odd. So one of the PCs will contain keys "k1, " and "k3" (with some value strings appended to them, see the code below) as they have one value each and the other will contain a single key "k2" as it has two values (found in each file).
These two PCs are written to GCS at different locations, and the flattened PC of the two will also be written to GCS (but it could have been further transformed).
The three output files are expected to contain the following contents (rows may not be in order):
output1:
k2: [v2],(w2)
output2:
k3: (w3)
k1: [v1]
outputMerged:
k3: (w3)
k2: [v2],(w2)
k1: [v1]
This is exactly what we see (and expected)in Dataflow SDK 1.9.
In 2.0 and 2.1 however, output1 and output2 come out to be empty (and the TextIO steps are not even executed as if there are no elements being input to them; we verified this by adding a dummy ParDo in-between, and it's not invoked at all).
This makes us very curious as to why suddenly this behavior change was made between 1.9 and 2.0/2.1, and what would be the best way for us to achieve what we have been doing with 1.9. Specifically, we produce output1/2 for archiving purposes, while we flatten the two PCs to transform the data further and produce another output.
Here is Java Code you can run (you will have to import properly, change the bucket name, and set Options properly, etc.).
Working code for 1.9:
//Dataflow SDK 1.9 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.Read.from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/outputMerged").withNumShards(1));
output1.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output1").withNumShards(1));
output2.apply(TextIO.Write.to(GcsPath.EXPERIMENT_BUCKET + "/test-job-1.9/output2").withNumShards(1));
pipeline.run();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.sideOutput(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
Working Code for 2.0/2.1:
// Dataflow SDK 2.0 and 2.1 compatible.
public class TestJob {
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, String>> data1 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data1.txt")).apply(ParDo.of(new doFn()));
PCollection<KV<String, String>> data2 =
pipeline.apply(TextIO.read().from(GcsPath.EXPERIMENT_BUCKET + "/data2.txt")).apply(ParDo.of(new doFn()));
TupleTag<String> inputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> inputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag1 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
TupleTag<String> outputTag2 = new TupleTag<String>() {
private static final long serialVersionUID = 1L;
};
PCollectionTuple tuple = KeyedPCollectionTuple.of(inputTag1, data1).and(inputTag2, data2)
.apply(CoGroupByKey.<String>create()).apply(ParDo.of(new doFn2(inputTag1, inputTag2, outputTag2))
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection<String> output1 = tuple.get(outputTag1);
PCollection<String> output2 = tuple.get(outputTag2);
PCollection<String> outputMerged = PCollectionList.of(output1).and(output2).apply(Flatten.<String>pCollections());
outputMerged.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/outputMerged").withNumShards(1));
output1.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output1").withNumShards(1));
output2.apply(TextIO.write().to(GcsPath.EXPERIMENT_BUCKET + "/test-job-2.1/output2").withNumShards(1));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
}
static class doFn2 extends DoFn<KV<String, CoGbkResult>, String> {
private static final long serialVersionUID = 1L;
final TupleTag<String> inputTag1;
final TupleTag<String> inputTag2;
final TupleTag<String> outputTag2;
public doFn2(TupleTag<String> inputTag1, TupleTag<String> inputTag2, TupleTag<String> outputTag2) {
this.inputTag1 = inputTag1;
this.inputTag2 = inputTag2;
this.outputTag2 = outputTag2;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String key = c.element().getKey();
List<String> values = new ArrayList<String>();
int numValues = 0;
for (String val1 : c.element().getValue().getAll(inputTag1)) {
values.add(String.format("[%s]", val1));
numValues++;
}
for (String val2 : c.element().getValue().getAll(inputTag2)) {
values.add(String.format("(%s)", val2));
numValues++;
}
final String line = String.format("%s: %s", key, Joiner.on(",").join(values));
if (numValues % 2 == 0) {
c.output(line);
} else {
c.output(outputTag2, line);
}
}
}
static class doFn extends DoFn<String, KV<String, String>> {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] tokens = c.element().split(",");
c.output(KV.of(tokens[0], tokens[1]));
}
}
}
Also, in case it is useful, the execution graph looks like this. (And for Google engineers, Job IDs are also specified).
With 1.9 (job id 2017-09-29_14_35_42-15149127992051688457):
With 2.1 (job id 2017-09-29_14_31_59-991964669451027883):
TextIO.Write 2,3 are not producing any output under 2.0/2.1. Flatten, and its subsequent step works fine.