2
votes

Code -

val configDetails2 = configDetails1
    .join(skuDetails, configDetails1.col("sku_num") === skuDetails.col("sku") &&
      configDetails1.col("ccn") === skuDetails.col("ccn"), "left_outer")
    .select(
      configDetails1.col("*"),
      skuDetails.col("part"),
      skuDetails.col("part_description"),
      skuDetails.col("part_qty"))
    .withColumn("item_name", when($"part".isNull, "DBNULL").otherwise($"part"))
    .withColumn("item_description", when($"part_description".isNull, "DBNULL").otherwise($"part_description"))
    .withColumn("item_qty", when($"part_qty".isNull, lit(0)).otherwise($"part_qty"))
    .drop("part", "part_description", "part_qty")

  val itemKey = configDetails2.select("item_name").rdd
  val itemMaster = itemKey
    .joinWithCassandraTable("dig_master", "item_master")
    .select("buyer", "cfg_name".as("cfg"), "item", "ms_name".as("scheduler")).map(_._2) 

Error -

Caused by: java.lang.IllegalArgumentException: requirement failed: Reordering broke ({ccn#98, sku_num#54, sku#223, part#224, ccn#243},ArrayBuffer(sku_num, ccn, sku, part, ccn)) was not ({ccn#98, ccn#222, sku_num#54, sku#223, part#224, ccn#243},ArrayBuffer(sku_num, ccn, sku, part, ccn, sku, part, ccn, sku_num, ccn, sku, part, ccn))

  at scala.Predef$.require(Predef.scala:224)   at org.apache.spark.sql.cassandra.execution.DSEDirectJoinStrategy.apply(DSEDirectJoinStrategy.scala:69)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.Iterator$class.foreach(Iterator.scala:893)   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)   at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.Iterator$class.foreach(Iterator.scala:893)   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)   at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)   at scala.collection.Iterator$class.foreach(Iterator.scala:893)   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)   at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)   at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)   at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)   at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)   at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2590)   at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2587)   at core.CollabStandardConfig$.delayedEndpoint$core$CollabStandardConfig$1(CollabStandardConfig.scala:185)

Was not able to find specific references to this error. Any help is appreciated.

1
What is the DSE version?Alex Ott
DSE 6.0.7 Spark 2.2.3.4sivan

1 Answers

1
votes

Did you upgrade scala version 2.10 to 2.11? Then try with below options,

 val itemKey = configDetails2.select("item_name").rdd
  val itemMaster = itemKey
    .joinWithCassandraTable("dig_master", "item_master")
    .select("buyer", "cfg_name".as("cfg"), "item", "ms_name".as("scheduler")).map(_._2) 

Change above code to SQL join as data frame instead of converting into Dataframe to a dataset.