1
votes

While reading inconsistent schema written group of parquet files, we have issue on schema merging. On switching to manually specifying schema i get following error. Any pointer will be helpful.

java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

source_location = "{}/{}/{}/dt={}/{}/*_{}_{}.parquet".format(source_initial,
                                                                       bucket,
                                                                       source_prefix,
                                                                       date,
                                                                       source_file_pattern,
                                                                       date,
                                                                       source_file_pattern)
schema = StructType([
        StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
        StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
        StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
        StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
        StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])

print("Querying data from source location {}".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
                                                     fun.sum("P_v").alias("P_v"),
                                                     fun.sum("R_v").alias("R_v"),
                                                     fun.sum("S_v").alias("S_v"))

df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)
1
What happens, if you do not specify schema manually (but leave mergeSchema option as it is)?Mariusz
@Mariusz :As there are some file with out any content as case of no events, so auto inferring results in org.apache.spark.SparkException: Failed merging schema of file.Srikant

1 Answers

5
votes

You cannot read parquet files in one load if schemas are not compatible. My advice would be to separate this to two loads and then union dataframes when you have them compatible. See example code:

schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))