I understand the concept of the broadcast optimization.
When one of the sides in the join have small data it's better to do the shuffle just for the small side.
but why isn't it possible to do this shuffle using only the executors? Why do we need to use the driver?
If each executor hold hash table to map the records between the executors I think it should work.
In the current implementation of spark broadcast - it collect the data to the driver and then shuffle it and the collect action to the driver is bottleneck that I would like to avoid.
Any ideas of how to achieve similar optimization without having the bottleneck of the driver memory?
1 Answers
You are correct, the current implementation requires the collection of the data to the driver before sending it across to the Executors.
There is already a JIRA ticket SPARK-17556 addressing exactly what you are proposing:
"Currently in Spark SQL, in order to perform a broadcast join, the driver must collect the result of an RDD and then broadcast it. This introduces some extra latency. It might be possible to broadcast directly from executors."
I have copied the proposed solution from an attached document to make this answer self-describing:
"To add a broadcastmethod to RDDto perform broadcastfrom executor, we need some support work as follow:
- Construct BroadCastIdfrom driver, BroadCastManager will supply a method to do this.
// Called from driver to create new broadcast id
def newBroadcastId: Long = nextBroadcastId.getAndIncrement()
BroadCastManagercould be able to create a broadcast with specified id and a persist tag to infer this broadcast isaexecutor broadcast, and its data will be backup onthe hdfs.
In the TorrentBroadcast.writeBlockswrite the block to hdfs, readBlocksread block from local, remote, hdfs by priority.
When construct the Broadcast, we can control whether to upload broadcast data block
BroadCastManagerpost a api to put broadcast data to block manager