1
votes

I am reading a file and try to map the values using a function. But it is giving a error NotSerializableException Below is the code I am running:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  def parseLine(line: String) = {
    val fields = line.split(",")
    val stationID = fields(0)
    val entryType = fields(2)
    val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f
    (stationID, entryType, temperature)
  }

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(parseLine)
}
}

When I run the above code it is giving me below error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.map(RDD.scala:370) at com.sundogsoftware.spark.MinTemperatures$.main(MinTemperatures.scala:32) at com.sundogsoftware.spark.MinTemperatures.main(MinTemperatures.scala)

Caused by: java.io.NotSerializableException:

com.sundogsoftware.spark.MinTemperatures$ Serialization stack: - object not serializable (class: com.sundogsoftware.spark.MinTemperatures$, value: com.sundogsoftware.spark.MinTemperatures$@41fed14f) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.sundogsoftware.spark.MinTemperatures$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/sundogsoftware/spark/MinTemperatures$.$anonfun$main$1:(Lcom/sundogsoftware/spark/MinTemperatures$;Ljava/lang/String;)Lscala/Tuple3;, instantiatedMethodType=(Ljava/lang/String;)Lscala/Tuple3;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda)

But when I run the same code with anonymous function it is running successfully:

package com.sundogsoftware.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min

/** Find the minimum temperature by weather station */
object MinTemperatures {

  /** Our main function where the action happens */
  def main(args: Array[String]) {

    // Set the log level to only print errors
    Logger.getLogger("org").setLevel(Level.ERROR)

    // Create a SparkContext using every core of the local machine
    val sc = new SparkContext("local[*]", "MinTemperatures")

    // Read each line of input data
    val lines = sc.textFile("../DataSet/1800.csv")

    // Convert to (stationID, entryType, temperature) tuples
    val parsedLines = lines.map(x => {
      val fields = x.split(",");
      val stationID = fields(0);
      val entryType = fields(2);
      val temperature = fields(3).toFloat * 0.1f * (9.0f / 5.0f) + 32.0f;
      (stationID, entryType, temperature)
    })

    // Filter out all but TMIN entries
    val minTemps = parsedLines.filter(x => x._2 == "TMIN")

    // Convert to (stationID, temperature)
    val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))

    // Reduce by stationID retaining the minimum temperature found
    val minTempsByStation = stationTemps.reduceByKey((x, y) => min(x, y))

    // Collect, format, and print the results
    val results = minTempsByStation.collect()

    for (result <- results.sorted) {
      val station = result._1
      val temp = result._2
      val formattedTemp = f"$temp%.2f F"
      println(s"$station minimum temperature: $formattedTemp")
    }

  }
}

Output:

EZE00100082 minimum temperature: 7.70 F
ITE00100554 minimum temperature: 5.36 F

As you seen above, when I am using named function (parseLine) inside map it is giving error, but the same program instead of named function when I used anonymous function in map it is running successfully.

I looked into few blogs but didn't get the reason for the error. Could anyone help me to understand this?

1
Move the function to another object, it should work. BTW, and advice be explicit about the return type.Luis Miguel Mejía Suárez
Hi @LuisMiguelMejíaSuárez. Your suggestion worked, thank you. I am new to scala so, can you explain the problem in my code and the solution you suggested or any reference to understand it?tejas
This is part of how spark works. It needs to serialize some closures (anonymous functions with context). Here whole MinTemperatures is context of parseLine (it is parent of it) and it is captured to be serialized. For some reason it cannot be serialized and this exception is thrown. I'm not using spark and cannot say anything more :(Scalway
@tejas as Scalway said, the entire object has to be serialized, and it couldn't probably something to do with the logger or the main or something internal about Spark. For that reason it is common to have a utils object with all those functions.Luis Miguel Mejía Suárez

1 Answers

1
votes

This issue doesn't seem to be sbt or dependency related, as I checked, this happens when the script is not defined as object (Scala objects are serialize-able by default) so this error means the script is not serializable. I created a new object and pasted the same code. It worked.