I failed to figure out how actually Spark SQL join operation works. I have read pretty massive explanation, but it doesn't bring a light on few questions.
Example
For example, you have two database tables which are saved in Spark (in parquet or any other format). And you have to join them based on some column:
SELECT t1.column_name_1
FROM parquet.`data/table1.parquet` as t1
LEFT JOIN parquet.`data/table2.parquet` as t2
ON t2.column_name_2 = t1.column_name_1
WHERE t2.column_name_2 is NULL
LIMIT 1
I gonna launch this query as sparkSession.sql(joinQuery)
Questions
- How Spark gonna shuffle
table1.parquetRDD andtable2.parquetRDD? As I understand, Spark need some key, by which it perform shuffling. What would be the key ifcolumn_name_1&column_name_2each has 1.000.000 unique rows? How many unique keys (partitions) I will get? How many shuffles I will get? - Is it possible to make Spark NOT fetch the whole
table1.parquetandtable2.parquetRDD into memory?
Why I suspicios about filtering
There is pretty trivial solution - just fiter DataFrames before filtering, and you will keep everything in RAM. But, I'm not sure this will perform well in my case.
Let's say filtering allow you to retrieve table1_subset1 and table1_subset2 from table1. Now to get the same join results u need to do more joins. I mean:
table1 JOIN table2 = table1_subset1 JOIN table2 + table1_subset2 JOIN table2
The same thing if I will filter table2:
table1 JOIN table2 = table1_subset1 JOIN table2_subset1 + table1_subset2 JOIN table2_subset1 +
table1_subset1 JOIN table2_subset2 + table1_subset2 JOIN table2_subset2
Now I have to join about 50 pairs of huge tables, each of them should be split into multiple chunks (subsets), lets say 5 chunks. So instead of 50 joins I will get 50 * 5 * 5 = 1250 filter and join operations between chunks, where each of chunks is 5 times smaller that original table (RDD).
Am I right if I suppose that performance will downgrade a lot? Or Spark is clever enough to perform the same amount of shuffles?