4
votes

I have the following function which can be compiled.

  def compare(dbo: Dataset[Cols], ods: Dataset[Cols]) = {
    val j = dbo.crossJoin(ods)
    // Tried val j = dbo.joinWith(ods, func.expr("true")) too
    j.take(5).foreach(r => println(r)) 
  }

But it got a runtime error when submitting to Spark.

Join condition is missing or trivial. (if using joinWith stead of crossJoin)
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1067)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1064)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1064)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1049)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2814)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
        at MappingPoint$.compare(MappingPoint.scala:43)
        at MappingPoint$.main(MappingPoint.scala:33)
        at MappingPoint.main(MappingPoint.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2
Why don't you use Spark 2.2 where this (and many more things) got improved a lot? Any reason to stick to 2.1.1? At least give the version a try and see if that fixes it. I think 2.2 could (as the code has changed).Jacek Laskowski
I think I installed 2.1.1 (the newest version at that time?) a couple of months ago and didn't upgrade it.ca9163d9
There's no installation of Spark but defining it as a library dependency or in PATH. Time to upgrade. At least you'll know if we're fighting something that's been already fixed before (and you won't waste your time).Jacek Laskowski
Tried 2.2.0 with scala 2.11.8 and still get the same error.ca9163d9

2 Answers

9
votes

I found the solution in How to enable Cartesian join in Spark 2.0?.

sparkConf.set("spark.sql.crossJoin.enabled", "true")
0
votes

The following works for me. I simplified the Cols case class so that I don't have to type so much, but it is otherwise I believe what you are attempting.

I used Spark 2.1.1:

case class Cols (
    A: Int,
    B: String
)

val dbo: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(1, "One"),
        Cols(2, "Two")
    )
)
val ods: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(3, "Three"),
        Cols(4, "Four")
    )
)

val cartesian: Dataset[(Cols,Cols)] = dbo.crossJoin(ods).map {
    case Row(lA: Int, lB: String, rA: Int, rB: String) => (Cols(lA, lB), Cols(rA, rB))
}
val result: Dataset[Int] = cartesian.map {
    case (l: Cols, r: Cols) => 0
}

As long as Cols has fewer than 11 elements, you should be OK. Otherwise, you might run into issues trying to pattern match on >22 elements after the crossJoin.

It looks to me like what you are submitting to Spark may still be using the joinWith line, which Spark apparently attempts to detect and prevent cartesian joins on.