I want to know if Spark knows the partitioning key of the parquet file and uses this information to avoid shuffles.
Context:
Running Spark 2.0.1 running local SparkSession. I have a csv dataset that I am saving as parquet file on my disk like so:
val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
I am creating 42 partitions by column numerocarte
. This should group multiple numerocarte
to same partition. I don't want to do partitionBy("numerocarte") at the write
time because I don't want one partition per card. It would be millions of them.
After that in another script I read this SomeFile.parquet
parquet file and do some operations on it. In particular I am running a window function
on it where the partitioning is done on the same column that the parquet file was repartitioned by.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
After read
I can see that the repartition
worked as expected and DataFrame df2
has 42 partitions and in each of them are different cards.
Questions:
- Does Spark know that the dataframe
df2
is partitioned by columnnumerocarte
? - If it knows, then there will be no shuffle in the window function. True?
- If it does not know, It will do a shuffle in the window function. True?
- If it does not know, how do I tell Spark the data is already partitioned by the right column?
- How can I check a partitioning key of
DataFrame
? Is there a command for this? I know how to check number of partitions but how to see partitioning key? - When I print number of partitions in a file after each step, I have 42 partitions after
read
and 200 partitions afterwithColumn
which suggests that Spark repartitioned myDataFrame
. - If I have two different tables repartitioned with the same column, would the join use that information?
df.rdd.partitioner
. If two dfs have same partitioners, there there may be no shuffle. You can check if there will be shuffle by callingdf.explain
. To check number of partitions calldf.rdd.partitions.length
. To more complete explanation about partitioning see jaceklaskowski.gitbooks.io/mastering-apache-spark/… – addmeaning