2
votes

Problem

Maybe this is due to my lack of Scala knowledge, but it seems like adding another level to the for comprehension should just work. If the first for comprehension line is commented out, the code works. I ultimately want a Set[Int] instead of '1 to 2', but it serves to show the problem. The first two lines of the for should not need a type specifier, but I include it to show that I've tried the obvious.

Tools/Jars

  • IntelliJ 2016.1
  • Java 8
  • Scala 2.10.5
  • Cassandra 3.x
  • spark-assembly-1.6.0-hadoop2.6.0.jar (pre-built)
  • spark-cassandra-connector_2.10-1.6.0-M1-SNAPSHOT.jar (pre-built)
  • spark-cassandra-connector-assembly-1.6.0-M1-SNAPSHOT.jar (I built)

Code

case class NotifHist(intnotifhistid:Int, eventhistids:Seq[Int], yosemiteid:String, initiatorname:String)
case class NotifHistSingle(intnotifhistid:Int, inteventhistid:Int, dataCenter:String, initiatorname:String)

object SparkCassandraConnectorJoins {
  def joinQueryAfterMakingExpandedRdd(sc:SparkContext, orgNodeId:Int) {

  val notifHist:RDD[NotifHistSingle] = for {
    orgNodeId:Int <- 1 to 2   // comment out this line and it works
    notifHist:NotifHist <- sc.cassandraTable[NotifHist](keyspace, "notifhist").where("intorgnodeid = ?", orgNodeId)
    eventHistId <- notifHist.eventhistids
  } yield NotifHistSingle(notifHist.intnotifhistid, eventHistId, notifHist.yosemiteid, notifHist.initiatorname)
  ...etc...
 }

Compilation Output

Information:3/29/16 8:52 AM - Compilation completed with 1 error and 0 warnings in 1s 507ms
       /home/jpowell/Projects/SparkCassandraConnector/src/com/mir3/spark/SparkCassandraConnectorJoins.scala
**Error:(88, 21) type mismatch;
 found   : scala.collection.immutable.IndexedSeq[Nothing]
 required: org.apache.spark.rdd.RDD[com.mir3.spark.NotifHistSingle]
      orgNodeId:Int <- 1 to 2
                    ^**

Later

@slouc Thanks for the comprehensive answer. I was using the for comprehension's syntactic sugar to also keep state from the second statement to fill elements in the NotifHistSingle ctor, so I don't see how to get the equivalent map/flatmap to work. Therefore, I went with the following solution:

def joinQueryAfterMakingExpandedRdd(sc:SparkContext, orgNodeIds:Set[Int]) {

  def notifHistForOrg(orgNodeId:Int): RDD[NotifHistSingle] = {
    for {
      notifHist <- sc.cassandraTable[NotifHist](keyspace, "notifhist").where("intorgnodeid = ?", orgNodeId)
      eventHistId <- notifHist.eventhistids
    } yield NotifHistSingle(notifHist.intnotifhistid, eventHistId, notifHist.yosemiteid, notifHist.initiatorname)
  }
  val emptyTable:RDD[NotifHistSingle] = sc.emptyRDD[NotifHistSingle]
  val notifHistForAllOrgs:RDD[NotifHistSingle] = orgNodeIds.foldLeft(emptyTable)((accum, oid) => accum ++ notifHistForOrg(oid))
}
1

1 Answers

0
votes

For comprehension is actually syntax sugar; what's really going on underneath is a series of chained flatMap calls, with a single map at the end which replaces yield. Scala compiler translates every for comprehension like this. If you use if conditions in your for comprehension, they are translated into filters, and if you don't yield anything foreach is used. For more information, see here.

So, to explain on your case - this:

val notifHist:RDD[NotifHistSingle] = for {
  orgNodeId:Int <- 1 to 2   // comment out this line and it works
  notifHist:NotifHist <- sc.cassandraTable[NotifHist](keyspace, "notifhist").where("intorgnodeid = ?", orgNodeId)
  eventHistId <- notifHist.eventhistids
} yield NotifHistSingle(...)

is actually translated by the compiler to this:

val notifHist:RDD[NotifHistSingle] = (1 to 2)
  .flatMap(x => sc.cassandraTable[NotifHist](keyspace, "notifhist").where("intorgnodeid = ?", x)
  .flatMap(x => x.eventhistids)
  .map(x => NotifHistSingle(...))

You are getting the error if you include the 1 to 2 line because that makes your for comprehension operate on a sequence (vector, to be more precise). So when invoking flatMap(), compiler expects you to follow up with a function that transforms each element of your vector to a GenTraversableOnce. If you take a closer look at the type of your for expression (most IDEs will display it just by hovering over it) you can see it for yourself:

def flatMap[B, That](f: A => GenTraversableOnce[B])(implicit bf: CanBuildFrom[Repr, B, That]): That

This is the problem. Compiler doesn't know how to flatMap the vector 1 to 10 using a function that returns CassandraRDD. It wants a function that returns GenTraversableOnce. If you remove the 1 to 2 line then you remove this restriction.

Bottom line - if you want to use a for comprehension and yield values out of it, you have to obey the type rules. It's impossible to flatten a sequence consisting of elements which are not sequences and cannot be turned into sequences.

You can always map instead of flatMap since map is less restrictive (it requires A => B instead of A => GenTraversableOnce[B]). This means that instead of getting all results in one giant sequence, you will get a sequence where each element is a group of results (one group for each query). You can also play around the types, trying to get a GenTraversableOnce from your query result (e.g. invoking sc.cassandraTable().where().toArray or something; I don't really work with Cassandra so I don't know).