1
votes

I am trying to get a simple example working mapping rows from Cassandra to a scala case class using Apache Spark 1.1.1, Cassandra 2.0.11, & the spark-cassandra-connector (v1.1.0). I have reviewed the documentation at the spark-cassandra-connector github page, planetcassandra.org, datastax, and generally searched around; but have not found anyone else encountering this issue. So here goes...

Building a tiny spark application using sbt (0.13.5), scala 2.10.4, spark 1.1.1 against Cassandra 2.0.11. Modelling the example from the spark-cassandra-connector docs the following two lines present an error in my IDE and fail to compile.

case class SubHuman(id:String, firstname:String, lastname:String, isGoodPerson:Boolean)
val foo = sc.cassandraTable[SubHuman]("nicecase", "human").select("id","firstname","lastname","isGoodPerson").toArray

The simple error presented by eclipse is:

No RowReaderFactory can be found for this type

The compile error is only slightly more verbose:

> compile
[info] Compiling 1 Scala source to /home/bkarels/dev/simple-case/target/scala-2.10/classes...
[error] /home/bkarels/dev/simple-case/src/main/scala/com/bradkarels/simple/SimpleApp.scala:82: No RowReaderFactory can be found for this type
[error]     val foo = sc.cassandraTable[SubHuman]("nicecase", "human").select("id","firstname","lastname","isGoodPerson").toArray
[error]                                          ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 1 s, completed Dec 10, 2014 9:01:30 AM
>

Scala source:

package com.bradkarels.simple

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._
// Likely don't need this import - but throwing darts hits the bullseye once in a while...
import com.datastax.spark.connector.rdd.reader.RowReaderFactory

object CaseStudy {

  def main(args: Array[String]) {
    val conf = new SparkConf(true)
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext("spark://127.0.0.1:7077", "simple", conf)

    case class SubHuman(id:String, firstname:String, lastname:String, isGoodPerson:Boolean)
    val foo = sc.cassandraTable[SubHuman]("nicecase", "human").select("id","firstname","lastname","isGoodPerson").toArray
  }
}

With the bothersome lines removed, everything compiles fine, assembly works, and I can perform other Spark operations normally. For example, if I remove the problem lines and drop in:

val rdd:CassandraRDD[CassandraRow] = sc.cassandraTable("nicecase", "human")

I get back the RDD and work with it as expected. That said, I suspect that my sbt project, assembly plugin, etc. are not contributing to the issues. The working source (less the new attempt to map to a case class as the connector as intended) can be found on github here.

But, to be more thorough, my build.sbt:

name := "Simple Case"

version := "0.0.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1",
    "org.apache.spark" %% "spark-sql" % "1.1.1",
    "com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0" withSources() withJavadoc()
  )

So the question is what have I missed? Hoping this is something silly, but if you have encountered this and can help me get past this puzzling little issue I would very much appreciate it. Please let me know if there are any other details that would be helpful in troubleshooting.

Thank you.

1

1 Answers

3
votes

This may be my newness with Scala in general, but I resolved this issue by moving the case class declaration out of the main method. So the simplified source now looks like this:

package com.bradkarels.simple

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._

object CaseStudy {

  case class SubHuman(id:String, firstname:String, lastname:String, isGoodPerson:Boolean)

  def main(args: Array[String]) {
    val conf = new SparkConf(true)
      .set("spark.cassandra.connection.host", "127.0.0.1")

    val sc = new SparkContext("spark://127.0.0.1:7077", "simple", conf)

    val foo = sc.cassandraTable[SubHuman]("nicecase", "human").select("id","firstname","lastname","isGoodPerson").toArray
  }
}

The complete source (updated & fixed) can be found on github https://github.com/bradkarels/spark-cassandra-to-scala-case-class