1
votes

I have parquet files which I need to read from spark. Some files have few columns missing which are present in new files.

Since I do not know which files have column missing, I need to read all the files in spark. I have list of columns that I need to read. It may also be the case that all the files may have some column missing. I need to put a null in those columns which are missing.

When I try to do a sqlContext.sql('query') it gives me error saying that columns are missing

If I define the schema and do a

sqlContext.read.parquet('s3://....').schema(parquet_schema)

It gives me the same error.

Help me here

1

1 Answers

8
votes

You need to use parquet schema evolution strategy to address this situation.

As defined in the spark documentation

Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

All you need to do is

val mergedDF = spark.read.option("mergeSchema", "true").parquet("'s3://....'")

This will give you parquet data with complete schema.

Pain point

In case your schema is non compatible for example one parquet file has col1 DataType as String and another parquet file has col1 DataType as Long.

Then the merge schema will fail.