I have posted this question on spark user forum but received no response so asking it here again.
We have a use case where we need to do a Cartesian join and for some reason we are not able to get it work with Dataset API's.
We have two dataset:
- one data set with 2 string columns say c1, c2. It is a small data set with ~1 million records. The two columns are both strings of 32 characters so should be less than 500 mb.
We broadcast this dataset
- the other data set is little bigger with ~10 million records
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count
If I implement it using RDD api where I broadcast data in ds1 and then filter data in ds2 it works fine.
I have confirmed the broadcast is successful.
2019-02-14 23:11:55 INFO CodeGenerator:54 - Code generated in 10.469136 ms 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Started reading broadcast variable 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Reading broadcast variable 29 took 6 ms 2019-02-14 23:11:56 INFO CodeGenerator:54 - Code generated in 11.280087 ms
Query Plan:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 <= c11#13) && (c11#13 <= c2#69))
:- *Project []
: +- *Filter isnotnull(_c0#0)
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string,_c4:string,_c5:string>
+- BroadcastExchange IdentityBroadcastMode
+- *Project [c1#68, c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(c2)], ReadSchema: struct
then the stage do not progress.
I updated the code to use broadcast ds1 and then did the join in the mapPartitions for ds2.
val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)
then used this rangesBC in the mapPartitions method to identify the range each row in ds2 belongs and this job completes in 3 hrs, while the other job does not complete even after 24 hrs. This kind of implies that the query optimizer is not doing what I want it to do.
What am I doing wrong? Any pointers will be helpful. Thank you!