0
votes

The following program tries to invoke 3 functions for every ROW(in RDD map):

    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats

    class TagCalculation extends Serializable {
    def test1(x: String) = x + " test1"
    def test2(x: String) = x + "test2" 
    def test3(x: String) = x + "test3" 
    def test5(arg1: java.lang.Integer, arg2: String, arg3: scala.collection.immutable.$colon$colon[Any]) = "test mix2"
  }
  val df = sqlContext.createDataFrame(Seq((1,"Android"), (2, "iPhone")))
  val get_test = new TagCalculation
  val field = Array("test1","test2","test3")

  val bb = df.rdd.map(row => {

    val reValue1 = "start"
    val ret = for(every <- field)
      yield {
        val test_para = Array(reValue1)
        val argtypes = test_para.map(_.getClass)
        val method4 = get_test.getClass.getMethod(every, argtypes: _*)

        val bbq = method4.invoke(get_test, test_para: _*)

        if (field.last == every)
            bbq
      }
    ret.last
  })

but some errors outputs:

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:313) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.map(RDD.scala:313) ........ at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

Any pointers?


It may be caused by "implicit val formats = DefaultFormats". But I need to extract value before "map".

1

1 Answers

1
votes

The problem is because you are defining TagCalculation class inside the calling class where you initialize and use the object. Just move it outside the calling class or make it a separate class and the issue with NotSerializableException should be solved.