14
votes

I have a need of joining tables using Spark SQL or Dataframe API. Need to know what would be optimized way of achieving it.

Scenario is:

  1. All data is present in Hive in ORC format (Base Dataframe and Reference files).
  2. I need to join one Base file (Dataframe) read from Hive with 11-13 other reference file to create a big in-memory structure (400 columns) (around 1 TB in size)

What can be best approach to achieve this? Please share your experience if some one has encounter similar problem.

3

3 Answers

20
votes

My default advice on how to optimize joins is:

  1. Use a broadcast join if you can (see this notebook). From your question it seems your tables are large and a broadcast join is not an option.

  2. Consider using a very large cluster (it's cheaper that you may think). $250 right now (6/2016) buys about 24 hours of 800 cores with 6Tb RAM and many SSDs on the EC2 spot instance market. When thinking about total cost of a big data solution, I find that humans tend to substantially undervalue their time.

  3. Use the same partitioner. See this question for information on co-grouped joins.

  4. If the data is huge and/or your clusters cannot grow such that even (3) above leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table.

Side note: I say "appending" above because in production I never use SaveMode.Append. It is not idempotent and that's a dangerous thing. I use SaveMode.Overwrite deep into the subtree of a partitioned table tree structure. Prior to 2.0.0 and 1.6.2 you'll have to delete _SUCCESS or metadata files or dynamic partition discovery will choke.

Hope this helps.

3
votes

Partition the source use hash partitions or range partitions or you can write custom partitions if you know better about the joining fields. Partition will help to avoid repartition during joins as spark data from same partition across tables will exist in same location. ORC will definitely help the cause. IF this is still causing spill, try using tachyon which will be faster than disk

2
votes

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.

To drastically speed up your sortMerges, write your large datasets as a Hive table with pre-bucketing and pre-sorting option (same number of partitions) instead of flat parquet dataset.

tableA
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")   
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_a")


tableb
  .repartition(2200, $"A", $"B")
  .write
  .bucketBy(2200, "A", "B")
  .sortBy("A", "B")    
  .mode("overwrite")
  .format("parquet")
  .saveAsTable("my_db.table_b")

The overhead cost of writing pre-bucketed/pre-sorted table is modest compared to the benefits.

The underlying dataset will still be parquet by default, but the Hive metastore (can be Glue metastore on AWS) will contain precious information about how the table is structured. Because all possible "joinable" rows are colocated, Spark won't shuffle the tables that are pre-bucketd (big savings!) and won't sort the rows within the partition of table that are pre-sorted.

val joined = tableA.join(tableB, Seq("A", "B"))

Look at the execution plan with and without pre-bucketing.

This will not only save you a lot of time during your joins, it will make it possible to run very large joins on relatively small cluster without OOM. At Amazon, we use that in prod most of the time (there are still a few cases where it is not required).

To know more about pre-bucketing/pre-sorting: