So I have an issue where by certain rows in a DataFrame get dropped when writing to partitioned Parquet files.
Here are my steps:
- Read CSV data files from S3 with specified schema
- Partition by 'date' column (DateType)
- write as Parquet with
mode=append
First step of reading works as expected, no parsing issues. For quality checks I do the following:
For a particular partition for date='2012-11-22', perform a count on CSV files, loaded DataFrame and parquet files.
Here's some code to reproduce using pyspark :
logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted
I have tried writing to HDFS too and result is same. This happens on multiple partitions. There are no records in default/null partition. logs_df above is accurate and correct.
Second experiment I tried was to write an unpartitioned parquet files. The only difference in above code was omission of partitionBy():
logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')
Loading this parquet set and running a count as above yielded the correct result of 5000 for date='2012-11-22' and other dates. setting mode to overwrite or not setting (using default) results in same data loss.
My environment is:
- EMR 5.9.0
- Spark 2.2.0
- Hadoop distribution: Amazon 2.7.3
- Tried with both EMRFS consistent view and not. However most testing was done writing to HDFS to avoid any S3 consistency issues.
I would very much appreciate a fix or workaround or another way of converting to parquet files using Spark.
Thanks,
Edit: I was not able to reproduce the second experiment. so let's say both partitioned and unpartitioned seems to drop records when writing to Parquet or JSON.
filter(). So it is always at the write step when using partitionBy that it goes wrong. - AtharvaI