1
votes

I am trying to run an scala program in spark that accesses Cassandra through the Cassandra connector from datastax.

I am getting the following error

    15/04/30 17:43:44 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRow
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
    at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
    at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:209)
    at org.apache.spark.sql.execution.Sort$$anonfun$execute$3$$anonfun$apply$4.apply(basicOperators.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)
    at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:618)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.CassandraSQLRow
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    ... 48 more

I am running the following configuration:

  • DSE 4.6
  • Cassandra 2.0.11.83
  • Spark 1.2.2
  • Scala 2.10.4
  • Cassandra Connector 1.2.0-rc3

I thought the issue could be related to Spark not properly loading the connector JAR and, therefore, I have tried the following:

1) Add the connector JAR to spark-env.sh

SPARK_CLASSPATH=/home/spark/jars/spark-cassandra-connector_2.10-1.2.0-rc3.jar

Spark complains this setting is deprecated.

2) Add the connector JAR to spark-defaults.conf

spark.executor.extraClassPath /home/spark/jars/spark-cassandra-connector_2.10-1.2.0-rc3.jar

Same issue

3) Add the connector JAR using --driver-class-path

I am getting the following exception:

Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/cache/CacheLoader

4) Add the connector JAR using --jars option when running spark-submit

Same issue

The program works fine when I run it on IntelliJ, but when I assemble it and run the fat JAR using spark-submit, I am getting the error shown before.

I thought it could be related to the following issue:

[https://datastax-oss.atlassian.net/browse/SPARKC-23][1]

which is supposed to be fixed in Connector version 1.1.2, but the issue reproduces on the version I am using 1.2.0-rc3

My build.sbt looks like this:

scalaVersion := "2.10.4"

val sparkVersion = "1.2.2"  

val cassandraConnectorVersion = "1.2.0-rc3" 

libraryDependencies ++= {
  Seq(
    ("org.apache.spark" %% "spark-core" % sparkVersion).
       exclude("org.mortbay.jetty", "servlet-api").
       exclude("commons-beanutils", "commons-beanutils-core").
       exclude("commons-collections", "commons-collections").
       exclude("commons-logging", "commons-logging").
       exclude("com.esotericsoftware.minlog" , "minlog").
       exclude("org.apache.hadoop" , "hadoop-yarn-api").
       exclude("org.apache.hadoop" , "hadoop-yarn-common").
       exclude("org.slf4j" , "jcl-over-slf4j").
       exclude("javax.servlet" , "javax.servlet-api").
       exclude("org.eclipse.jetty.orbit" , "javax.servlet").
       exclude("org.eclipse.jetty.orbit" , "javax.activation").
       exclude("org.eclipse.jetty.orbit" , "javax.mail.glassfish").
       exclude("org.eclipse.jetty.orbit" , "javax.transaction"), // % "provided",
    "org.apache.spark" %% "spark-sql" % sparkVersion, // % "provided",
    "org.apache.spark" %% "spark-mllib" % sparkVersion, // % "provided",
    "com.datastax.spark" %% "spark-cassandra-connector" % cassandraConnectorVersion,
    "javax.servlet" % "javax.servlet-api" % "3.0.1",
    "org.mongodb" % "mongo-java-driver" % "2.12.4",
    "org.mongodb" % "casbah_2.10" % "2.8.0",
    "com.typesafe" % "config" % "1.2.1",
    "org.scalanlp" %% "breeze" % "0.10",
    "joda-time" % "joda-time" % "2.7",
    "org.rogach" %% "scallop" % "0.9.5",
    "org.apache.commons" % "commons-io" % "1.3.2",
    "com.google.code.gson" % "gson" % "2.3.1",
    "com.novus" %% "salat-core" % "1.9.9"
  )}

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

resolvers += "Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/releases/"

UPDATE:

I have tried the same using Spark 1.1.1 and Spark-Connector 1.1.1. I am experiencing the same issues.

2

2 Answers

2
votes

The Jira referenced above just adds more documentation on where to include Jars. Your error seems to stem from a incompatibility between Spark 1.2.0-rc3 connector which expects Spark 1.2 and DSE 4.6 which contains Spark 1.1.0.

Try using the 1.1.X version of the connector.


Edits

This seems to still be hitting an unfortunate class-loader issue with Spark.

I was hoping the new Spark would fix this but it seems the issue is still present. The error occurs because of the different class loaders used by the spark executor and the KryoSerializer code. The workaround is to make sure that the connector jar is NOT part of your fat jar so that you can have the entire library loaded by the system class loader. Instead you manually move the jar to all the executors and the specify the classpath using the executorExtraClassPath variable.

So the key here is to make sure that the Spark Cassandra Connector classes are NOT on the ExecutorURLClassloader and instead on the system class-loader.

Here is an example I ran using the REPL on spark 1.2.1

automaton@ubuntu:~/spark-1.2.1-bin-hadoop1$ 
./bin/spark-shell 
    --master spark://ubuntu:7077 --driver-class-path /home/automaton/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar --conf spark.executor.extraClassPath=/home/automaton/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar  -conf spark.cassandra.connection.host=127.0.0.1 

scala> import org.apache.spark.sql.cassandra.CassandraSQLContext 
import org.apache.spark.sql.cassandra.CassandraSQLContext 

scala> val cc = new CassandraSQLContext(sc) 
cc: org.apache.spark.sql.cassandra.CassandraSQLContext = org.apache.spark.sql.cassandra.CassandraSQLContext@3f8aef3e 

scala> cc.sql("SELECT * FROM test.fun as a JOIN test.fun as b ON (a.k = b.v)").collect 
res0: Array[org.apache.spark.sql.Row] = Array([31,31,31,31],  ..... 

Notice how I use --driver-class-path to get the jar on the driver class-loader, and then use --conf spark.executor.extraClassPath to get the jar onto the system class loader of the executor JVM.

0
votes

Looks like this jira:

https://datastax-oss.atlassian.net/browse/SPARKC-23

Fixed in the connector 1.1.2