I am new to Spark, Scala and Cassandra. Using Spark I am trying to get some ids from MySQL.
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println)
I am able to see ids printed on console.
Now For each fetched id I need to do a Sum operation on a table in Cassandra.
I created a function which I am able to call by passing individual id
object HelloWorld {
def sum(id : String): Unit = {
val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
println(each_spark_rdd)
}
}
and declared uplink_rdd as
val uplink_rdd = sc.cassandraTable("keyspace", "table")
I am able to call the function by passing individual id and can see the sum
scala> HelloWorld.sum("5")
50
When When I am trying to run the same function on each fetch id as
myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or
for (id <- myRDD) HelloWorld.sum(id)
Its giving same exception as exception
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at $iwC$$iwC$$iwC$$iwC$$iwC.(:71) at $iwC$$iwC$$iwC$$iwC.(:73) at $iwC$$iwC$$iwC.(:75) at $iwC$$iwC.(:77) at $iwC.(:79) at (:81) at .(:85) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf
I tried adding @transient to RDDs After reading Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually as
@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....
but still getting same error.
Please let me know how can I find the sum from Cassandara table for each id fetched from Mysql.
foreach
, Spark tries to serializeHelloWorld.sum
to pass it to each of the executors - but to do so it has to serialize the function's closure too, which includesuplink_rdd
(and that isn't serializable). However, when you find yourself trying to do this sort of thing, it is usually just an indication that you want to be using ajoin
or something similar instead. – Alec