4
votes

I'm trying to use Spark to convert a bunch of csv files to parquet, with the interesting case that the input csv files are already "partitioned" by directory. All the input files have the same set of columns. The input files structure looks like :

/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

I'd like to read those files with Spark and write their data to a parquet table in hdfs, preserving the partitioning (partitioned by input directory), and such as there is a single output file per partition. The output files strucutre should look like :

hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet

The best solution I have found so far is to loop among the input directories, loading the csv files in a dataframe and to write the dataframe in the target partition in the parquet table. But this not efficient since I want a single output file per partition, the writing to hdfs is a single tasks that blocks the loop. I wonder how to achieve this with a maximum of parallelism (and without shuffling the data in the cluster).

Thanks !

2

2 Answers

1
votes

Rename your input directories changing dirX to dir=dirX. Then perform:

spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output')

If you cannot rename directories, you can use Hive Metastore. Create external table and one partition per every directory. Then load this table and rewrite using above pattern.

0
votes

Best solution I've found so far (no shuffling and as many threads as input dirs) :

  • Create an rdd of input dirs, with as many partitions as input dirs

  • Transform it to an rdd of input files (preserving the partitions by dirs)

  • Flat-map it with a custom csv parser

  • Convert rdd to dataframe

  • Write dataframe to parquet table partitioned by dirs

It requires to write his own parser. I could not find a solution to preserve the partitioning using sc.textfile or the databricks csv parser.