0
votes

From spark documentation:

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or setting the global SQL option spark.sql.parquet.mergeSchema to true.

(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)

My understanding from the documentation is that if I have multiple parquet partitions with different schemas, spark will be able to merge these schemas automatically if I use spark.read.option("mergeSchema", "true").parquet(path).

This seems like a good option if I don't know at query time what schemas exist in these partitions.

However, consider the case where I have two partitions, one using an old schema, and one using a new schema that differs only in having one additional field. Let's also assume that my code knows the new schema and I'm able to pass this schema in explicitly.

In this case, I would do something like spark.read.schema(my_new_schema).parquet(path). What I'm hoping Spark would do in this case is read in both partitions using the new schema and simply supply null values for the new column to any rows in the old partition. Is this the expected behavior? Or do I need also need to use option("mergeSchema", "true") in this case as well?

I'm hoping to avoid using the mergeSchema option if possible in order to avoid the additional overhead mentioned in the documentation.

1

1 Answers

0
votes

I've tried extending the example code from the spark documentation linked above, and my assumptions appear to be correct. See below:

// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]

scala> squaresDF.write.parquet("test_data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]

scala> cubesDF.write.parquet("test_data/test_table/key=2")

// Read the partitioned table

scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

scala> mergedDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)


// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]

// Note that cube column is missing.
scala> naiveDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- key: integer (nullable = true)


// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
|    3|     9|null|  1|
|    4|    16|null|  1|
|    5|    25|null|  1|
|    8|  null| 512|  2|
|    9|  null| 729|  2|
|   10|  null|1000|  2|
|    1|     1|null|  1|
|    2|     4|null|  1|
|    6|  null| 216|  2|
|    7|  null| 343|  2|
+-----+------+----+---+

As you can see, spark appears to be correctly supplying null values to any columns missing from the parquet partitions when using an explicit schema to read the data.

This makes me feel fairly confident that I can answer my question with "no, the mergeSchema option is not necessary in this case," but I'm still wondering if there are any caveats that I should be aware of. Any additional help from others would be appreciated.