1
votes

I have a partitioned hdfs parquet location which is having different schema is different partition.

Say 5 columns in first partition, 4 cols in 2nd partition. Now I try to read the base Parquet path and then filter the 2nd partition.

This gives me 5 columns in the DF even though I have only 4 columns in Parquet files in 2nd partition. When I read the 2nd partition directly, it gives correct 4 cols. How to fix this.

2

2 Answers

4
votes

You can specify the required schema(4 columns) while reading the parquet file!

  • Then spark only reads the fields that included in the schema, if field not exists in the data then null will be returned.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val sch=new StructType().add("i",IntegerType).add("z",StringType)
spark.read.schema(sch).parquet("<parquet_file_path>").show()

//here i have i in my data and not have z field
//+---+----+
//|  i|   z|
//+---+----+
//|  1|null|
//+---+----+
0
votes

I would really like to hep you but I am not sure what you actually want to achieve. What's your intention about this?

If you to read the parquet file with all it's partitions and you just wanna get the columns both partitions have, maybe the read option "mergeSchema" fits your need.

Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or setting the global SQL option spark.sql.parquet.mergeSchema to true.

refer to spark documentation

so it would be interesting which version of spark you are using and how the properties spark.sql.parquet.mergeSchema (spark setting) and mergeSchema (client) are set