1
votes

I am new to Spark, Scala and Cassandra. Using Spark I am trying to get some ids from MySQL.

import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

val myRDD = new JdbcRDD( sc, () => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println) 

I am able to see ids printed on console.

Now For each fetched id I need to do a Sum operation on a table in Cassandra.

I created a function which I am able to call by passing individual id

object HelloWorld { 
       def sum(id : String): Unit = {
        val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
        println(each_spark_rdd)
      }
  }

and declared uplink_rdd as

 val uplink_rdd = sc.cassandraTable("keyspace", "table")

I am able to call the function by passing individual id and can see the sum

scala> HelloWorld.sum("5") 
50

When When I am trying to run the same function on each fetch id as

myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or 
for (id <- myRDD) HelloWorld.sum(id)

Its giving same exception as exception

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:59) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:63) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) at $iwC$$iwC$$iwC$$iwC$$iwC.(:71) at $iwC$$iwC$$iwC$$iwC.(:73) at $iwC$$iwC$$iwC.(:75) at $iwC$$iwC.(:77) at $iwC.(:79) at (:81) at .(:85) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkConf

I tried adding @transient to RDDs After reading Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually as

@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....

but still getting same error.

Please let me know how can I find the sum from Cassandara table for each id fetched from Mysql.

1
The problem is that you are essentially trying to perform an action inside a transformation - transformations and actions in Spark cannot be nested. When you call foreach, Spark tries to serialize HelloWorld.sum to pass it to each of the executors - but to do so it has to serialize the function's closure too, which includes uplink_rdd (and that isn't serializable). However, when you find yourself trying to do this sort of thing, it is usually just an indication that you want to be using a join or something similar instead.Alec
Did you check this out? linkCfuentes
object HelloWorld extends serializable42n4

1 Answers

0
votes

Your code is attempting to use an uplink_rdd within the transformation on myRDD. A closure applied to an RDD cannot contain another RDD.

You should instead be doing something along the lines of joinWithCassandraTable which will in parallel and distributed(ly?) use the information from myRDD to pull data from Cassandra. This works if you are pulling Single partition keys from Cassandra

See the Docs

Another option is to use a manual connection drawing from the pool the connector uses.

val cc = CassandraConnector(sc.getConf)
myRDD.mapPartitions { it =>
  cc.withSessionDo { session =>
    session.execute("whatever query you want")
  }
}

If you are actually summing over multiple partitions in Cassandra you need to make a new rdd for each ID.

Something like

myRDD.collect.foreach(HelloWorld.sum)