18
votes

I am doing a broadcast join of two tables A and B. B is a cached table created with the following Spark SQL:

create table B as select segment_ids_hash from  stb_ranker.c3po_segments
      where
        from_unixtime(unix_timestamp(string(dayid), 'yyyyMMdd')) >= CAST('2019-07-31 00:00:00.000000000' AS TIMESTAMP)
      and
        segmentid_check('(6|8|10|12|14|371|372|373|374|375|376|582|583|585|586|587|589|591|592|594|596|597|599|601|602|604|606|607|609|610|611|613|615|616)', seg_ids) = true
cache table B

The column 'segment_ids_hash' is of integer type and the result contains 36.4 million records. The cached table size is about 140 MB, as shown below enter image description here

Then I did the join as follows:

select count(*) from A broadcast join B on A.segment_ids_hash = B.segment_ids_hash

enter image description here

Here broadcast exchange data size is about 3.2 GB.

My question is why the broadcast exchange data size (3.2GB) is so much bigger than the raw data size (~140 MB). What are the overheads? Is there any way to reduce the broadcast exchange data size?

Thanks

1
What is ur cluster size and what serilization are you using plz update into question ,When Brodcast data on 100 references to a 140 MB variable, it should be 140 GBvaquar khan
Spark broadcast collects the data to the driver then dispatches to each executor, the size shown is the total sent over the wire I thinksramalingam24
@sramalingam24 I've tested with different number of executors and the broadcast bytes size does not change.seiya
Do you know which one is being broadcast? You can check the generated query plan, also the number of partitions of the other would play a rolesramalingam24

1 Answers

6
votes

Tl; dr: I'm also learning about the source of data size metrics. This one is probably only the estimated size of the operation, it might not reflect the actual size of the data. Don't worry about it too much for now.

Full version:

Update: got back to correct some mistakes. I see that the previous answer was lacking some depth, so I'll try to dig deeper for this as I can (I'm still relatively new to answering question).

Update 2: rephrasing, removed some overly done joke (sry)

Ok, so this thing might be very long but I think this metrics is not really is the direct size of the data.

To begin with, I made a test run for this one to reproduce the results with 200 executors and 4 cores:

enter image description here

This returned this results: enter image description here

Now I see that something is interesting, since the dataSize for my test is around 1.2GB not 3.2 GB, this led me to read the source code of Spark.

When I go to github, I see that the 4 numbers in BroadcastExchange corresponding to this: First link: BroadcastHashJoinExec: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala enter image description here

Data size corresponding to this one:

enter image description here I found the relation val here appear to be a HashedRelationBroadcastMode.

Go to HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala: enter image description here

Since we have Some(Numrows) (it's the number of row of the DF). The match case use case one (line 926:927)

Go back to HashedRelation constructor-y like part: enter image description here

Since the join is for hashed int, the type is not Long => the join use UnsafeHashedRelation

To UnsafeHashedRelation:

enter image description here

Now we go to the place in UnsafeHashedRelation that determine the estimated size, I found this:

enter image description here

Focus on the estimated size, our target is the binaryMap object (later on the code assign map = binaryMap)

Then it go to here: enter image description here

binaryMap is a BytestoBytesMap, which corresponding to here https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

Jump to getTotalMemoryConsumption method (the one that get estimatedSize), we got:

enter image description here

enter image description here

This is my current deadend at the moment. Just my two cents, I don't think it is a bug but just the estimated size of the join, and since this is an estimated size, I don't really think it has to be very accurate (yeah but it's weird to be honest in this case because the difference is very big).

In case that you want to continue to play with the dataSize on this one. One approach is to directly impact binaryMap object by modifying the input for its constructor. Look back at this:

enter image description here

There are two variables that can be configured, which is MEMORY_OFFHEAP_ENABLED and BUFFER_PAGE size. Perhaps you can try to experiment with those two configuration during spark-submit. That is also the reason why the BroadcastExec size doesn't change even when you changed the number of executors and cores.

So in conclusion, I think data size is an estimation generated by some fascinating mechanism (This one I'm also waiting for someone with more expertise to explain this as I'm digging into it), not directly the size that you have mentioned in the first image (140 MB). As such, it probably not worth to spend much time to reduce the overhead of this particular metrics.

Some bonus related stuff:

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html