To combine the records from 2 or more spark Dataframes, join is necessary.
If your data is not partitioned / bucketed well, it will lead to a Shuffle join. In which every node talks to every other node and they share data according to which node has a certain key or set of keys (on which you are joining). These joins are expensive because the network can become congested with traffic.

The shuffle can be avoided if:
- Both Dataframes have a known partitioner or Bucketized.
- One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join
Partitioning
If you partition your data correctly prior to a join, you can end up with much more efficient execution because even if a shuffle is planned, if data from two different DataFrames is already located on the same machine, Spark can avoid the shuffle.
df1.repartition(col("id"))
df2.repartition(col("id"))
// you can optionally specify the number of partitions like:
df1.repartition(10, col("id"))
// Join Dataframes on id column
df1.join(df2, "id") // this will avoid the duplicate id columns in output DF.
Broadcast Hash join
When one of the Dataset is small enough to fit into the memory of a single worker node, , we can optimize our join.
Spark will replicate the small DataFrame onto every worker node in the cluster (be it located on one machine or many). Now this sounds expensive. However, what this does is prevent us from performing the all-to-all communication during the entire join process. Instead, it performs only once at the beginning and then let each individual worker node perform the work without having to wait or communicate with any other worker node.
import org.apache.spark.sql.functions.broadcast
// explicitly specify the broadcast hint, though spark handles it.
df1.join(broadcast(df2), "id")

join. - Shaido