Disclaimer:
This answer contains primarily speculations. A detailed explanation of this phenomena might require in-depth analysis of the input and output (or at least their respective metadata).
Observations:
- Entropy effectively bounds the performance of the strongest lossless compression possible - Wikipedia - Entropy (information theory).
Both persistent columnar formats as well as the internal Spark SQL representation transparently apply different compression techniques (like Run-length encoding or dictionary encoding) to reduce the memory footprint of the stored data.
Additionally on disk formats (including plain text data) can be explicitly compressed using general purpose compression algorithms - it is not clear if this is the case here.
Compression (explicit or transparent) are applied to blocks of data (typically partitions, but smaller units can be used).
Based on 1), 2) and 3) we can assume that the average compression rate will depend on the distribution of the data in the cluster. We should also note that the final result can be non-deterministic, if the upstream lineage contains wide transformations.
Possible impact of coalesce
vs. repartition
:
In general coalesce
can take two paths:
- Escalate through the pipeline up to the source - the most common scenario.
- Propagate to the nearest shuffle.
In the first case we can expect that the compression rate will be comparable to the compression rate of the input. However there are some cases where can achieve much smaller final output. Let's imagine a degenerate dataset:
val df = sc.parallelize(
Seq("foo", "foo", "foo", "bar", "bar", "bar"),
6
).toDF
If dataset like this was written to disk there would be no potential for compression - each value has to be written as-is:
df.withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 1|
| foo| 2|
| bar| 3|
| bar| 4|
| bar| 5|
+-----+---+
In other words we need roughly 6 * 3 bytes giving 18 bytes in total.
However if we coalesce
df.coalesce(2).withColumn("pid", spark_partition_id).show
+-----+---+
|value|pid|
+-----+---+
| foo| 0|
| foo| 0|
| foo| 0|
| bar| 1|
| bar| 1|
| bar| 1|
+-----+---+
we can for example apply RLE with small int as the count, and store each partition 3 + 1 bytes giving 8 bytes in total.
This is of course a huge oversimplification, but shows how preserving low entropy input structure, and merging blocks can result in a lower memory footprint.
The second coalesce
scenario is less obvious, but there are scenarios where entropy can be reduced by the upstream process (think for example about window functions) and preserving such structure will be beneficial.
What about repartition
?
Without partitioning expression repartition
applies RoundRobinPartitioning
(implemented as HashPartitioning
with a pseudo-random key based on the partition id). As long as the hash function behaves sensibly, such redistribution should maximize the entropy of the data and as a result decrease possible compression rate.
Conclusion:
coalesce
shouldn't provide any specific benefits alone, but can preserve existing properties of data distribution - this property can be advantageous in certain cases.
repartition
, due to its nature, will on average makes things worse, unless entropy of the data is already maximized (a scenario where things improve is possible, but highly unlikely on non-trivial dataset).
Finally repartition
with partitioning expression or repartitionByRange
should decrease entropy, and improve compression rates.
Note:
We should also keep in mind that columnar formats usually decide on a specific compression / encoding method (or lack of it) based on the runtime statistics. So even if the set of rows in a particular block is fixed, but the order of rows changes, we can observe different outcomes.
coalsece(2000)
generated 2000 files. – Powers