0
votes

I want to join 2 very big tables by specific mutual key using Spark, I try to understand what is the optimal way to do that.

Let's say, for the example:

  • table 1 contains 900M rows and ~100 columns
  • table 2 contains 600M rows and ~200 columns.
  • We can't use "broadcast join", the tables are big and can't be broadcast.

I want to join (inner join) the tables using the mutual 'id' columns that exists in both of them, in addition, I know that the id columns contains the same values in both of the tables, there is no id value that exists in one but doesn't exist in the other.

The ideal way I can think of is to "divide" each one of my tables into partitions/buckets that contains the same 'id' values and to send them to the same executor that will calculate the join result with minimum data shuffling in the cluster.

My questions are:

  1. If I use for example .repartition(5, 'id') for each one of the tables - each one of the 5 partitions will contain the same 'id' values? (as long as we have the same 'id' values in both of them)

for example:

df1
+---+---+------+
|age| id|  name|
+---+---+------+
|  5|  1| David|
| 50|  2|  Lily|
| 10|  3|   Dan|
| 15|  4|Nicole|
| 16|  5|  Dana|
| 19|  6|   Ron|
| 20|  7| Alice|
| 22|  8|  Nora|
| 45|  9|  Sara|
| 70| 10| Aaron|
+---+---+------+


df2
+---+-----+
| id|price|
+---+-----+
|  1| 30.8|
|  1| 40.3|
|  2|100.0|
|  2| 30.1|
|  3| 99.0|
|  3|102.0|
|  4| 81.2|
|  4| 91.2|
|  5| 73.4|
|  6| 22.2|
|  7|374.4|
|  8|669.7|
|  9|  4.8|
| 10|35.38|
+---+-----+

df1.repartition(5,'id')
df2.repartition(5,'id')

If df1 partitions are: [id=1,id=2],[id=3,id=4],[id=5,id=6],[id=7,id=8],[id=9,id=10]

Is it necessarily be the same for df2?

  1. If I use 'bucketBy' in the same way, will I get the same 'id' values in the buckets of the tables?

  2. Will spark send the right partitions to the same executor? I mean, the partition that contains [id=1,id=2] of table 1 and the partition that contains [id=1,id=2] for table 2 will be sent to the same executor for the join.

If I miss something, or you can recommend another way to join 2 big tables under the assumptions I mentioned, it will be very helpful.

2

2 Answers

1
votes

Take a look at this answer.
TLDR: If you want to join them once and its the only aim for re-partitioning, just simply join them.

-1
votes

Yes it would have to be like that, else the whole paradigm of JOINing would not be reliable.

You mean actually Worker - the machine with Executor(s).

repartition on its own would not be advisable as round-robin.

Range partitioning works as well. Checked to be sure, but assume same distribution of partitioning values as proviso.

It all works on the premise of lazy evaluation.

bucketBy can be used - but is more for persisting to disk and using in next App.

Again you need not worry about assisting as lazy eval means there is chance for the Optimizer to work it all out - to which Worker to allocate. But that is at a lower level of detail, abstraction.