3
votes

I have posted this question on spark user forum but received no response so asking it here again.

We have a use case where we need to do a Cartesian join and for some reason we are not able to get it work with Dataset API's.

We have two dataset:

  • one data set with 2 string columns say c1, c2. It is a small data set with ~1 million records. The two columns are both strings of 32 characters so should be less than 500 mb.

    We broadcast this dataset

  • the other data set is little bigger with ~10 million records
val ds1 = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2")
ds1.count
val ds2 = spark.read.format("csv").load(<s3-location>).toDF("c11", "c12", "c13", "c14", "c15", "ts")
ds2.count
ds2.crossJoin(broadcast(ds1)).filter($"c1" <= $"c11" && $"c11" <= $"c2").count

If I implement it using RDD api where I broadcast data in ds1 and then filter data in ds2 it works fine.

I have confirmed the broadcast is successful.

2019-02-14 23:11:55 INFO CodeGenerator:54 - Code generated in 10.469136 ms 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Started reading broadcast variable 29 2019-02-14 23:11:55 INFO TorrentBroadcast:54 - Reading broadcast variable 29 took 6 ms 2019-02-14 23:11:56 INFO CodeGenerator:54 - Code generated in 11.280087 ms

Query Plan:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross, ((c1#68 <= c11#13) && (c11#13 <= c2#69))
:- *Project []
: +- *Filter isnotnull(_c0#0)
: +- *FileScan csv [_c0#0,_c1#1,_c2#2,_c3#3,_c4#4,_c5#5] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string,_c2:string,_c3:string,_c4:string,_c5:string>
+- BroadcastExchange IdentityBroadcastMode
+- *Project [c1#68, c2#69]
+- *Filter (isnotnull(c1#68) && isnotnull(c2#69))
+- *FileScan csv [c1#68,c2#69] Batched: false, Format: CSV, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(c2)], ReadSchema: struct

then the stage do not progress.

I updated the code to use broadcast ds1 and then did the join in the mapPartitions for ds2.

val ranges = spark.read.format("csv").option("header", "true").load(<s3-location>).select("c1", "c2").collect
val rangesBC = sc.broadcast(ranges)

then used this rangesBC in the mapPartitions method to identify the range each row in ds2 belongs and this job completes in 3 hrs, while the other job does not complete even after 24 hrs. This kind of implies that the query optimizer is not doing what I want it to do.

What am I doing wrong? Any pointers will be helpful. Thank you!

2
Add the explain for us to look at. Strikes me as large 10M x 1M may take a whilethebluephantom
But as ds1 is broadcast it should not take so much time. Also it works in less than 10 minutes with RDD based APIs. Also I have updated the query plan.Ankur
Yes. I do a count for now.Ankur
I saw it too late.thebluephantom
Tried it but even that is extremely slow.Ankur

2 Answers

3
votes

I have run into this issue recently and found that Spark has a strange partitioning behavior when cross joining large dataframes. If your input dataframe contain few million records, then the cross joined dataframe has partitions equal to the multiplication of the input dataframes partition, that is

Partitions of crossJoinDF = (Partitions of ds1) * (Partitions of ds2).

If ds1 or ds2 contain about few hundred partitions then the cross join dataframe would have partitions in the range of ~ 10,000. These are way too many partitions, which result in excessive overhead in managing many small tasks, making any computation (in your case - filter) on the cross joined data frame very slow to run.

So how do you make the computation faster? First check if this is indeed the issue for your problem:

scala> val crossJoinDF = ds2.crossJoin(ds1)
# This should return immediately because of spark lazy evaluation

scala> val crossJoinDFPartitions = crossJoinDF.rdd.partitions.size

Check the number of the partitions on the cross joined dataframe. If crossJoinDFPartitions > 10,000, then you do indeed have the same issue i.e cross joined dataframe has way too many partitions.

To make your operations on cross joined dataframe faster, reduce the number of partitions on the input DataFrames. For example:

scala> val ds1 = ds1.repartition(40)
scala> ds1.rdd.partitions.size 
res80: Int = 40

scala> val ds2 = ds2.repartition(40)
scala> ds2.rdd.partitions.size 
res81: Int = 40

scala> val crossJoinDF = ds1.crossJoin(ds2)
scala> crossJoinDF.rdd.partitions.size 
res82: Int = 1600

scala> crossJoinDF.count()

The count() action should result in execution of the cross join. The count should now return in a reasonable amount of time. The number of exact partitions you choose would depend on number of cores available in your cluster.

The key takeaway here is to make sure that your cross joined dataframe has reasonable number of partitions (<< 10,000). You might also find this post useful which explains this issue in more detail.

1
votes

I do not know if you are on bare metal or AWS with spot or on-demand or dedicated, or VMs with AZURE, et al. My take:

  • Appreciate that 10M x 1M is a lot of work, even if .filter applies on the resultant cross join. It will take some time. What were your expectations?
  • Spark is all about scaling in a linear way in general.
  • Data Centers with VMs do not have dedicated and hence do not have the fastest performance.

Then:

  • I ran on Databricks 10M x 100K in a simulated set-up with .86 core and 6GB on Driver for Community Edition. That ran in 17 mins.
  • I ran the 10M x 1M in your example on a 4 node AWS EMR non-dedicated Cluster (with some EMR-oddities like reserving the Driver on a valuable instance!) it took 3 hours for partial completion. See the picture below.

enter image description here

So, to answer your question: - You did nothing wrong.

  • Just just need more resources allowing more parallelisation.
  • I did add some explicit partitioning as you can see.