0
votes

I use Spark (especially pyspark) and have the following scenario:

  • There are two tables (A and B)
  • A is partitioned by columns c and d.
  • I want to read all the data from A, do a tiny transformation (e.g. adding one static column) and then save the result to B.
  • B should have the exact same partitioning as A.
  • Shuffling needs to be avoided since we have 3 billion rows

So from a logical perspective, it should be very easy to make this happen without any shuffling.

But how?

Version 1

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").save("pathToB")

In this case, table B is not partitioned at all.

Version 2

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

In this case, there is a lot of shuffling going in, it takes forever.

Version 3

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df = df.repartition("c", "d")
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

Same as version 2 -> lots of shuffling, does never finish.

If anyone has an idea on how to archive this without shuffling, it would be very helpful! Thanks alot in advance!

Best regards, Benjamin

1

1 Answers

0
votes

defacto you cant version 2 is the correct way:

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

what you could do instead is if version 1 is not shuffling at all you could read in every permutation of c, d that exists by itself which would be the same as version 1. But I doubt that will be faster:

permutations = [(c,d) for (c,d) in df.dropDuplicates(['c','d']).collect()]

for (c, d) in permutations:
  df = spark.read.parquet("pathToA").filter(f'c = "{c}" AND d = "{d}"')
  df = df.withColumn("x", f.lit("x"))
  df.write.format("parquet").save(f'pathToB/c={c}/d={d}')

This is kind of workaroundy and I think if you have a hive table on top of that folder you would need to use msck repair table to refresh it.