0
votes

We have parquet files generated with two different schemas where we have ID and Amount fields.
File:
file1.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,6)
Content:
1,19500.00
2,198.34


file2.snappy.parquet
ID: INT
AMOUNT: DECIMAL(15,2)
Content:
1,19500.00
3,198.34

When I am loading both the files together df3 = spark.read.parquet("output/"), and tried to get the data it is inferring the schema of Decimal(15,6) to the file which has amount with Decimal(16,2) and that files data is getting manipulated wrongly. Is there is a way that I can retrieve the data properly for this case.

Final output I could see after executing df3.show()
+---+-----------------+
|ID|       AMOUNT|
+---+-----------------+
| 1|        1.950000|
| 3|        0.019834|
| 1|19500.000000|
| 2|    198.340000|
+---+-----------------+

Here if you see for 1st and 2nd row the amount got manipulated incorrectly.

Looking for some suggestions on this. I know if we regenerate the files with same schema this issue will go away, this requires regeneration and replacing of the files which were delivered, is there any other way temporary which we can use and mean while we will work on regenerating those files.

~R, Krish

2
I have a point here, if spark is allowing us to load this data means, there would haven been an option as well to load the data correctly. Correct me if I am wrong here. - Krish
Raised issue to Spark community on this. issues.apache.org/jira/browse/SPARK-32317 - Krish

2 Answers

1
votes

You can try by using mergeSchema property as true. So instead of

df3 = spark.read.parquet("output/") 

Try this:

df3 = spark.read.option("mergeSchema","true").parquet("output/")

But this will give inconsistency records if the version of spark is different for both the parquet. in this case the new version of spark should set the below property to true. spark.sql.parquet.writeLegacyFormat

0
votes

Try to read this as a string and provide the schema manually while reading the file

schema = StructType([
StructField("flag_piece", StringType(), True)
])

spark.read.format("parquet").schema(schema).load(path)