While reading inconsistent schema written group of parquet files, we have issue on schema merging. On switching to manually specifying schema i get following error. Any pointer will be helpful.
java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
source_location = "{}/{}/{}/dt={}/{}/*_{}_{}.parquet".format(source_initial,
bucket,
source_prefix,
date,
source_file_pattern,
date,
source_file_pattern)
schema = StructType([
StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])
print("Querying data from source location {}".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
fun.sum("P_v").alias("P_v"),
fun.sum("R_v").alias("R_v"),
fun.sum("S_v").alias("S_v"))
df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)
mergeSchema
option as it is)? – Mariusz