First let me introduce my use case, i daily receive a 500 million rows like so :
ID | Categories
1 | cat1, cat2, cat3, ..., catn
2 | cat1, catx, caty, ..., anothercategory
Input data: 50 compressed csv files each file is 250 MB -> Total :12.5 GB Compressed
The purpose is to answer questions like : find all ids that belongs to Catx and Caty, find ids that belongs to cat3 and not caty etc...: ie : ids in cat1 U cat2or ids cat3 ∩ catx
Assuming that categories are dynamically created (every day i have a new set of categories) and my business wants to explore all possible intersections and unions ( we don't have a fixed set of queries) i came up with the following solution :
I wrote a spark job that transforms the date into a fat sparse matrix where columns are all possible categories plus a column ID, for each row and column i set true were id belongs to this category and false if not:
ID | cat1 | cat2 | cat3 |...| catn | catx | caty | anothercategory |....
1 | true | true | true |...| true | false |false |false |....
2 | true |false |false |...|false | true | true | true |....
SQL can simply answer my questions, for instance if i want to find all ids that belongs to category cat1 and category catx then i run the following sql query against this matrix :
Select id from MyTable where cat1 = true and catx=true;
I choose to save this sparse matrix as a compressed parquet file, i made this choice with regards to the sparsity and the queries nature, i believe columnar storage is the most appropriate storage format.
Now with my use case described here are my observations, i may be missing some optimization points :
- 12.5GB compressed input data after transformation take ~300GB writing this sparse matrix as parquet takes too much time and resources, it took 2,3 hours with spark1.6 stand alone cluster of 6 aws instances r4.4xlarge (i set enough parallelization to distribute work and take advantage of all the workers i have)
I ended up with too many parquet files, the more i parallelize the smallest parquet files are. Seems like each RDD gives a single parquet file -> too many small files is not optimal to scan as my queries go through all the column values
I went through a lot of posts but still don't understand why writing 500 Million/1000 column compressed parquet to S3 takes this much time, once on S3 the small files sums up to ~35G
Looking to the application master UI, the job hangs on the writing stage, the transformation stage and the shuffling don't seem to be resource/time consuming.
I tried to tweak parquet parameters like group_size, page_size an disable_dictionnary but did not see performance improvements.
I tried to partition to bigger RDDs and write them to S3 in order to get bigger parquet files but the job took too much time,finally i killed it.
I could run the job in ~ 1 hour using a spark 2.1 stand alone cluster of 4 aws instances of type r4.16xlarge, i feel like i am using a huge cluster to achieve a small improvement, the only benefit i got is running more parallel tasks. Am i missing something ? I can maybe leverage ~ 1 To RAM to achieve this better and get bigger parquet files.
Guys do you have any feedback regarding writing large parquet file on S3 using spark?
I would like to know your opinions/ critics about this solution too.
Thanks and Regards.