2
votes

I am working in Spark (on azure databricks) with a 15 billion rows file that looks like this :

+---------+---------------+----------------+-------------+--------+------+
|client_id|transaction_key|transaction_date|   product_id|store_id|spend|
+---------+---------------+----------------+-------------+--------+------+
|        1|  7587_20121224|      2012-12-24|     38081275|     787| 4.54|
|        1| 10153_20121224|      2012-12-24|         4011|    1053| 2.97|
|        2|  6823_20121224|      2012-12-24|    561122924|     683| 2.94|
|        3| 11131_20121224|      2012-12-24|     80026282|    1131|  0.4|
|        3|  7587_20121224|      2012-12-24|        92532|     787| 5.49|

This data is used for all my queries, which consist mostly in groupby (product_id for example), sum and count distinct :

results = trx.filter(col("transaction_date") > "2018-01-01"
                     & 
                     col("product_id").isin(["38081275", "4011"])
             .groupby("product_id")
             .agg(sum("spend").alias("total_spend"),
                  countdistinct("transaction_key").alias("number_trx"))

I never need to use 100% of this data, I always start with a filter on :

  • transaction_date (1 000 distinct values)
  • product_id (1 000 000 distinct values)
  • store_id (1 000 distinct values)

==> What is the best way to partition this data in a parquet file ?

I initially partitionned the data on transaction_date :

trx.write.format("parquet").mode("overwrite").partitionBy("transaction_date").save("dbfs:/linkToParquetFile")

This will create partitions that are approximately the same size. However, most of the queries will require to keep at least 60% of the transaction_date, whereas only a few product_id are usually selected in 1 query. (70% of the store_id kept usually)

==> Is there a way to build a parquet file taking this into account ?

It seems partitionning the data on product_id would create way too much partitions...

Thanks!

1

1 Answers

1
votes

for example you can use multiple columns for partitioning (it creates sub folders) and spark can use partition filters

another good idea is bucketing more information here (to avoid extra shuffle)

Example with hive

trx.write.partitionBy("transaction_date", "store_id").bucketBy(1000, "product_id").saveAsTable("tableName")

to read it use

spark.table("tableName")