4
votes

Our job/pipeline is writing the results of a ParDo transformation back out to GCS i.e. using TextIO.Write.to("gs://...")

We've noticed that when the job/pipeline completes, it leaves numerous 0 byte files in the output bucket.

The input to the pipeline is from multiple files from GCS, so I'm assuming the results are sharded, which is fine.

But why do we get empty files?

enter image description here

1
Do you explicitly specify the number of shards?G B
Just wondering, as the other files are very small (I don't think dataflow guarantees any balancing in file sizes).G B
Ahh, the screenshot only shows a few of the files. There more below, and larger.Graham Polley

1 Answers

8
votes

It is likely that these empty shards are the results of an intermediate pipeline step which turned out to be somewhat sparse and some pre-partitioned shards had no records in them.

E.g. if there was a GroupByKey right before the TextIO.Write and, say, the keyspace was sharded into ranges [00, 01), [01, 02), ..., [fe, ff) (255 shards total), but all actual keys emitted from the input of this GroupByKey were in the range [34, 81) and [a3, b5), then 255 output files will be produced, but most of them will turn out empty. (this is a hypothetical partitioning scheme, just to give you the idea)

The rest of my answer will be in the form of Q&A.

Why produce empty files at all? If there's nothing to output, don't create the file! It's true that it would be technically possible to avoid producing them, e.g. by opening them lazily when writing output when the first element is written. AFAIK we normally don't do this because empty output files are usually not an issue, and it is easier to understand an empty file than absence of a file: it would be pretty confusing if, say, only the first of 50 shards turned out non-empty and you would only have a single output file named 00001-of-000050: you'd wonder what happened to the 49 other ones.

But why not add a post-processing step to delete the empty files? In principle we could add a post-processing step of deleting the empty outputs and renaming the rest (to be consistent with the xxxxx-of-yyyyy filepattern) if empty outputs became a big issue.

Does existence of empty shards signal a problem in my pipeline? A lot of empty shards might mean that the system-chosen sharding was suboptimal/uneven and we should have split the computation into fewer, more uniform shards. If this is a problem for you, could you give more details about your pipeline's output, e.g.: your screenshot shows that the non-empty outputs are also pretty small: do they contain just a handful of records? (if so, it may be difficult to achieve uniform sharding without knowing the data in advance)

But the shards of my original input are not empty, doesn't sharding of output mirror sharding of input? If your pipeline has GroupByKey (or derived) operations, there will be intermediate steps where the number of shards in input and output are different: e.g. an operation may consume 30 shards of input but produce 50 shards of output, or vice versa. Different number of shards in input and output is also possible in some other cases not involving GroupByKey.

TL;DR If your overall output is correct, it's not a bug, but tell us if it is a problem for you :)