Tl; dr: I'm also learning about the source of data size metrics. This one is probably only the estimated size of the operation, it might not reflect the actual size of the data. Don't worry about it too much for now.
Full version:
Update: got back to correct some mistakes. I see that the previous answer was lacking some depth, so I'll try to dig deeper for this as I can (I'm still relatively new to answering question).
Update 2: rephrasing, removed some overly done joke (sry)
Ok, so this thing might be very long but I think this metrics is not really is the direct size of the data.
To begin with, I made a test run for this one to reproduce the results with 200 executors and 4 cores:
This returned this results:
Now I see that something is interesting, since the dataSize for my test is around 1.2GB not 3.2 GB, this led me to read the source code of Spark.
When I go to github, I see that the 4 numbers in BroadcastExchange corresponding to this:
First link: BroadcastHashJoinExec: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Data size corresponding to this one:
I found the relation val here appear to be a HashedRelationBroadcastMode.
Go to HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala:
Since we have Some(Numrows) (it's the number of row of the DF). The match case use case one (line 926:927)
Go back to HashedRelation constructor-y like part:
Since the join is for hashed int, the type is not Long => the join use UnsafeHashedRelation
To UnsafeHashedRelation:
Now we go to the place in UnsafeHashedRelation that determine the estimated size, I found this:
Focus on the estimated size, our target is the binaryMap object (later on the code assign map = binaryMap)
Then it go to here:
binaryMap is a BytestoBytesMap, which corresponding to here https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Jump to getTotalMemoryConsumption method (the one that get estimatedSize), we got:
This is my current deadend at the moment. Just my two cents, I don't think it is a bug but just the estimated size of the join, and since this is an estimated size, I don't really think it has to be very accurate (yeah but it's weird to be honest in this case because the difference is very big).
In case that you want to continue to play with the dataSize on this one. One approach is to directly impact binaryMap object by modifying the input for its constructor. Look back at this:
There are two variables that can be configured, which is MEMORY_OFFHEAP_ENABLED and BUFFER_PAGE size. Perhaps you can try to experiment with those two configuration during spark-submit. That is also the reason why the BroadcastExec size doesn't change even when you changed the number of executors and cores.
So in conclusion, I think data size is an estimation generated by some fascinating mechanism (This one I'm also waiting for someone with more expertise to explain this as I'm digging into it), not directly the size that you have mentioned in the first image (140 MB). As such, it probably not worth to spend much time to reduce the overhead of this particular metrics.
Some bonus related stuff:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html