23
votes

I have run into a problem where I have Parquet data as daily chunks in S3 (in the form of s3://bucketName/prefix/YYYY/MM/DD/) but I cannot read the data in AWS EMR Spark from different dates because some column types do not match and I get one of many exceptions, for example:

java.lang.ClassCastException: optional binary element (UTF8) is not a group

appears when in some files there's an array type which has a value but the same column may have null value in other files which are then inferred as String types.

or

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)

I have raw data in S3 in JSON format and my initial plan was to create an automatic job, which starts an EMR cluster, reads in the JSON data for the previous date and simply writes it as parquet back to S3.

The JSON data is also divided into dates, i.e. keys have date prefixes. Reading JSON works fine. Schema is inferred from the data no matter how much data is currently being read.

But the problem rises when parquet files are written. As I understand, when I write parquet with metadata files, these files contain the schema for all parts/partitions of the parquet files. Which, to me it seems, can also be with different schemas. When I disable writing metadata, Spark was said to infer the whole schema from the first file within the given Parquet path and presume it stays the same through other files.

When some columns, which should be double type, have only integer values for a given day, reading in them from JSON (which has these numbers as integers, without floating points) makes Spark think it is a column with type long. Even if I can cast these columns to double before writing the Parquet files, this still is not good as the schema might change, new columns can be added, and tracking this is impossible.

I have seen some people have the same problems but I have yet to find a good enough solution.

What are the best practices or solutions for this?

3

3 Answers

9
votes

These are the options I use for writing parquet to S3; turning off schema merging boosts writeback performance -it may also address your problem

val PARQUET_OPTIONS = Map(
 "spark.sql.parquet.mergeSchema" -> "false",
 "spark.sql.parquet.filterPushdown" -> "true")
9
votes

As I read the data in daily chunks from JSON and write to Parquet in daily S3 folders, without specifying my own schema when reading JSON or converting error-prone columns to correct type before writing to Parquet, Spark may infer different schemas for different days worth of data depending on the values in the data instances and write Parquet files with conflicting schemas.

It may not be the perfect solution, but the only way I found to solve my problem with an evolving schema is the following:

Before my daily (more specifically nightly) cron job of batch processing previous day's data I am creating a dummy object with mostly empty values.

I make sure the ID is recognizable, for example as the real data has unique ID-s, I add "dummy" string as an ID to the dummy data object.

Then I will give expected values for properties with error-prone types, for example I will give floats/doubles non-zero values so when marshalling to JSON, they would definitely have decimal separator, for example "0.2" instead of "0" (When marshalling to JSON, doubles/floats with 0 values are shown as "0" not "0.0").

Strings and booleans and integers work fine, but in addition to doubles/floats I also needed to instantiate arrays as empty arrays and objects of other classes/structs with corresponding empty objects so they wouldn't be "null"-s, as Spark reads null-s in as strings.


Then if I have all the necessery fields filled, I will marshall the object to JSON and write the files to S3.

Then I would use these files in my Scala batch processing script to read them in, save the schema to a variable and give this schema as a parameter when I read in the real JSON data to avoid Spark doing its own schema inferring.

That way I know all the fields are always of the same type and schema merging is only necessary to join schemas when new fields are added.

Of course it adds a drawback of manually updating the dummy object creation when new fields of error-prone types are added, but this is currently a small drawback as it is the only solution I have found that works.

1
votes

Just make an rdd[String] where each string is a json,when making the rdd as dataframe use primitiveAsString option to make all the datatypes to String

 val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000)
 // rdd[String]  each string is a json ,lowercased json
    val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp))
 // now the schema of dataframe would be consolidate schema of all json strings
    val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD)

    println(jsonDataframe_stream.printSchema())


    jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY)