1
votes

I have spark-streaming code which works in client mode: it reads data from kafka, does some processing, and use spark-cassandra-connector to insert data to cassandra.

When I use the "--deploy-mode cluster", data does not get inserted, and I get the following error:

Exception in thread "streaming-job-executor-53" java.lang.NoClassDefFoundError: com/datastax/spark/connector/ColumnSelector at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:94) at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:88) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I added dependancy for connector like this:

"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" % "provided"

This is my application code:

    val measurements = KafkaUtils.createDirectStream[
  Array[Byte],
  Array[Byte],
  DefaultDecoder,
  DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
  .map {
    case (k, v) => {
      val decoder = new AvroDecoder[WattioMeasure](null,
        WattioMeasure.SCHEMA$)
      decoder.fromBytes(v)
    }
  }

//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
  processWattioRaw(measurements))(
  (rdd: RDD[
    WattioTenantRaw], t: Time) => {
    rdd.cache()
    //get all the different tenants
    val differentTenants = rdd.map(a
    => a.tenant).distinct().collect()
    // for each tenant, create keyspace value and flush to cassandra
    differentTenants.foreach(tenant => {
      val keyspace = tenant + "_readings"
      rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
    })
    rdd.unpersist(true)
  }
)

ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()
3
how are you specifying the connector dependency at runtime? What is your full launch command?RussS

3 Answers

1
votes

You need to make sure your JAR is available to the workers. The spark master will open up a file server once the execution of the job starts.

You need to specify the path to your uber jar either by using SparkContext.setJars, or via the --jars flag passed to spark-submit.

From the documentation

When using spark-submit, the application jar along with any jars included with the --jars option will be automatically transferred to the cluster. Spark uses the following URL scheme to allow different strategies for disseminating jars

0
votes

Actually I solved it by removing "provided" in the dependancy list, so that sbt packaged spark-cassandra-connector to my assembly jar.

Interesting thing is that in my launch script, even when I tryed to use

spark-submit --repositories "location of my artifactory repository" --packages "spark-cassandra-connector"

or

spark-submit --jars "spark-cassandra-connector.jar"

both of them failed!

0
votes

scope provided means, you expect the JDK or a container to provide the dependency at runtime and that particular dependency jar will not be part of your final application War/jar you are creating hence that error.