I have a RDD of format RDD[((Long, Long), (Long, Long))] and I need to covert or transform into RDD[((Long, Long), (Long, Long, Long, Long))] where second RDD tuple is based on a function from the first RDD.
I am trying to achieve this based map function but, I think am doing something wrong here. Please help me to solve the issue.
Here is the full code:
package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map
class ListItemCorrelation(sc: SparkContext) extends Serializable {
def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
if (dirX.equals(1)) {
if (dirY.equals(1)) {
return (1, 0, 0, 0)
} else {
return (0, 1, 0, 0)
}
} else {
if (dirY.equals(1)) {
return (0, 0, 1, 0)
} else {
return (0, 0, 0, 1)
}
}
}
def run(votes: String): RDD[((Long, Long), (Long, Long, Long, Long))] = {
val userVotes = sc.textFile(votes)
val userVotesPairs = userVotes.map { t =>
val p = t.split(",")
(p(0).toLong, (p(1).toLong, p(2).toLong))
}
val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
//More functionality
return result
}
}
object ListItemCorrelation extends Serializable {
def main(args: Array[String]) {
val votes = args(0)
val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
val context = new SparkContext(conf)
val job = new ListItemCorrelation(context)
val results = job.run(votes)
val output = args(1)
results.saveAsTextFile(output)
context.stop()
}
}
When I try to run this script I am getting following error:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.ranker.correlation.listitem.ListItemCorrelation.run(ListItemCorrelation.scala:34) at com.ranker.correlation.listitem.ListItemCorrelation$.main(ListItemCorrelation.scala:47) at com.ranker.correlation.listitem.ListItemCorrelation.main(ListItemCorrelation.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@4248e66b) - field (class: com.ranker.correlation.listitem.ListItemCorrelation, name: sc, type: class org.apache.spark.SparkContext) - object (class com.ranker.correlation.listitem.ListItemCorrelation, com.ranker.correlation.listitem.ListItemCorrelation@270b6b5e) - field (class: com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, name: $outer, type: class com.ranker.correlation.listitem.ListItemCorrelation) - object (class com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 12 more
This error happening while executing following line:
var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
I am very new to scala, please help me to find the right way to do this.
SparkContextdown as an argument forrun? With the current implementation it will be dragged all the way down, and since it is not serialization, cause a failure. - zero323