2
votes

So I have an issue where by certain rows in a DataFrame get dropped when writing to partitioned Parquet files.

Here are my steps:

  1. Read CSV data files from S3 with specified schema
  2. Partition by 'date' column (DateType)
  3. write as Parquet with mode=append

First step of reading works as expected, no parsing issues. For quality checks I do the following:

For a particular partition for date='2012-11-22', perform a count on CSV files, loaded DataFrame and parquet files.

Here's some code to reproduce using pyspark :

logs_df = spark.read.csv('s3://../logs_2012/', multiLine=True, schema=get_schema()')
logs_df.filter(logs_df.date=='2012-11-22').count() # results in 5000
logs_df.write.partitionBy('date').parquet('s3://.../logs_2012_parquet/', mode='append')
par_df = spark.read.parquet('s3://.../logs_2012_parquet/')
par_df.filter(par_df.date=='2012-11-22').count() # results in 4999, always the same record that is omitted

I have tried writing to HDFS too and result is same. This happens on multiple partitions. There are no records in default/null partition. logs_df above is accurate and correct.

Second experiment I tried was to write an unpartitioned parquet files. The only difference in above code was omission of partitionBy():

logs_df.write.parquet('s3://.../logs_2012_parquet/', mode='append')

Loading this parquet set and running a count as above yielded the correct result of 5000 for date='2012-11-22' and other dates. setting mode to overwrite or not setting (using default) results in same data loss.

My environment is:

  • EMR 5.9.0
  • Spark 2.2.0
  • Hadoop distribution: Amazon 2.7.3
  • Tried with both EMRFS consistent view and not. However most testing was done writing to HDFS to avoid any S3 consistency issues.

I would very much appreciate a fix or workaround or another way of converting to parquet files using Spark.

Thanks,

Edit: I was not able to reproduce the second experiment. so let's say both partitioned and unpartitioned seems to drop records when writing to Parquet or JSON.

1
Is it always the same record which is missing? Is the date well-defined for all the records in your dataframe? - Alexandre Dupriez
Yes it's the same one. The date is formatted correctly. The DataFrame contains the record as I can identify it using filter(). So it is always at the write step when using partitionBy that it goes wrong. - AtharvaI
Any ideas why certain rows get dropped on writing to hdfs or s3? I tried this with columns as STRINGs too. Can't figure out why. - AtharvaI
also as an experiment tried to write out JSON files and reading back, same rows get dropped :'( - AtharvaI

1 Answers

0
votes

So the mystery was definitely in the schema definitions. However, unexpectedly it wasn't dates or timestamps. It was in fact boolean values.

I had exported the CSV from Redshift which writes bools as t and f. When I inspected the inferred schema those fields were marked as string types. A simple test with true and false in a CSV file recognised them as boolean.

So I was expecting date and timestamp parsing to go wrong as usual but it was booleans. Lesson learnt.

Thanks for the pointers.