I have spark-streaming code which works in client mode: it reads data from kafka, does some processing, and use spark-cassandra-connector to insert data to cassandra.
When I use the "--deploy-mode cluster", data does not get inserted, and I get the following error:
Exception in thread "streaming-job-executor-53" java.lang.NoClassDefFoundError: com/datastax/spark/connector/ColumnSelector at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:94) at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:88) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
I added dependancy for connector like this:
"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" % "provided"
This is my application code:
val measurements = KafkaUtils.createDirectStream[
Array[Byte],
Array[Byte],
DefaultDecoder,
DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
.map {
case (k, v) => {
val decoder = new AvroDecoder[WattioMeasure](null,
WattioMeasure.SCHEMA$)
decoder.fromBytes(v)
}
}
//inserting into WattioRaw
WattioFunctions.run(WattioFunctions.
processWattioRaw(measurements))(
(rdd: RDD[
WattioTenantRaw], t: Time) => {
rdd.cache()
//get all the different tenants
val differentTenants = rdd.map(a
=> a.tenant).distinct().collect()
// for each tenant, create keyspace value and flush to cassandra
differentTenants.foreach(tenant => {
val keyspace = tenant + "_readings"
rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw")
})
rdd.unpersist(true)
}
)
ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()