24
votes

A data lake I am working with (df) has 2 TB of data and 20,000 files. I'd like to compact the data set into 2,000 1 GB files.

If you run df.coalesce(2000) and write out to disk, the data lake contains 1.9 TB of data.

If you run df.repartition(2000) and write out to disk, the data lake contains 2.6 TB of data.

Each file in the repartition() data lake is exactly 0.3 GB larger than expected (they’re all 1.3 GB files instead of 1 GB files).

Why does the repartition() method increase the size of the overall data lake?

There is a related question that discusses why the size of a data lake increases after aggregations are run. The answer says:

In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.

Is the coalesce() algorithm providing data that's more organized... I don't think so...

I don't think the other question answers my question.

1
can you count the number of files generated in coalsec(2000). If it is less than 2000 we can draw some conclusions.loneStar
@Achyuth - coalsece(2000) generated 2000 files.Powers
@Powers re partition might generated equal size files, while coalsece might generated random size files.loneStar

1 Answers

20
votes

Disclaimer:

This answer contains primarily speculations. A detailed explanation of this phenomena might require in-depth analysis of the input and output (or at least their respective metadata).

Observations:

  1. Entropy effectively bounds the performance of the strongest lossless compression possible - Wikipedia - Entropy (information theory).
  2. Both persistent columnar formats as well as the internal Spark SQL representation transparently apply different compression techniques (like Run-length encoding or dictionary encoding) to reduce the memory footprint of the stored data.

    Additionally on disk formats (including plain text data) can be explicitly compressed using general purpose compression algorithms - it is not clear if this is the case here.

  3. Compression (explicit or transparent) are applied to blocks of data (typically partitions, but smaller units can be used).

  4. Based on 1), 2) and 3) we can assume that the average compression rate will depend on the distribution of the data in the cluster. We should also note that the final result can be non-deterministic, if the upstream lineage contains wide transformations.

Possible impact of coalesce vs. repartition:

In general coalesce can take two paths:

  • Escalate through the pipeline up to the source - the most common scenario.
  • Propagate to the nearest shuffle.

In the first case we can expect that the compression rate will be comparable to the compression rate of the input. However there are some cases where can achieve much smaller final output. Let's imagine a degenerate dataset:

val df = sc.parallelize(
  Seq("foo", "foo", "foo", "bar", "bar", "bar"),
  6 
).toDF

If dataset like this was written to disk there would be no potential for compression - each value has to be written as-is:

df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  1|
|  foo|  2|
|  bar|  3|
|  bar|  4|
|  bar|  5|
+-----+---+

In other words we need roughly 6 * 3 bytes giving 18 bytes in total.

However if we coalesce

df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
|  foo|  0|
|  foo|  0|
|  foo|  0|
|  bar|  1|
|  bar|  1|
|  bar|  1|
+-----+---+

we can for example apply RLE with small int as the count, and store each partition 3 + 1 bytes giving 8 bytes in total.

This is of course a huge oversimplification, but shows how preserving low entropy input structure, and merging blocks can result in a lower memory footprint.

The second coalesce scenario is less obvious, but there are scenarios where entropy can be reduced by the upstream process (think for example about window functions) and preserving such structure will be beneficial.

What about repartition?

Without partitioning expression repartition applies RoundRobinPartitioning (implemented as HashPartitioning with a pseudo-random key based on the partition id). As long as the hash function behaves sensibly, such redistribution should maximize the entropy of the data and as a result decrease possible compression rate.

Conclusion:

coalesce shouldn't provide any specific benefits alone, but can preserve existing properties of data distribution - this property can be advantageous in certain cases.

repartition, due to its nature, will on average makes things worse, unless entropy of the data is already maximized (a scenario where things improve is possible, but highly unlikely on non-trivial dataset).

Finally repartition with partitioning expression or repartitionByRange should decrease entropy, and improve compression rates.

Note:

We should also keep in mind that columnar formats usually decide on a specific compression / encoding method (or lack of it) based on the runtime statistics. So even if the set of rows in a particular block is fixed, but the order of rows changes, we can observe different outcomes.