0
votes

I'm trying to write a HDFS output file using Scala, and I'm receiving the error below:

exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.foreach(RDD.scala:868) Caused by: java.io.NotSerializableException: java.io.PrintWriter Serialization stack:

All line 23 I need to write a line in output file.

Code Source:

package com.mycode.logs;

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import scala.io._
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.PrintWriter;

/**
 * @author RondenaR
 * 
 */
object NormalizeMSLogs{

  def main(args: Array[String]){
    processMsLogs("/user/temporary/*file*")
  }

  def processMsLogs(path: String){
    System.out.println("INFO: ****************** started ******************")

    // **** SetMaster is Local only to test *****
    // Set context
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val hiveContext = new HiveContext(sc)

    // Set HDFS
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020")
    val hdfs = FileSystem.get(hdfsconf)

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt"))
    val writer = new PrintWriter(output)

    val sourcePath = new Path(path)
    var count :Int = 0
    var lineF :String = ""

    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
      val filePathName = fileStatus.getPath().toString()
      val fileName = fileStatus.getPath().getName()

      val hdfsfileIn = sc.textFile(filePathName)
      val msNode = fileName.substring(1, fileName.indexOf("es"))

      System.out.println("filePathName: " + filePathName)
      System.out.println("fileName: " + fileName)
      System.out.println("hdfsfileIn: " + filePathName)
      System.out.println("msNode: " + msNode)

      for(line <- hdfsfileIn){
        //System.out.println("line = " + line)
        count += 1

        if(count != 23){
          lineF = lineF + line + ", "
        }

        if(count == 23){
          lineF = lineF + line + ", " + msNode
          System.out.println(lineF)
          writer.write(lineF) 
          writer.write("\n")
          count = 0
          lineF = ""
        }
      } // end for loop in file
    } // end foreach loop
    writer.close()
    System.out.println("INFO: ******************ended ******************")
    sc.stop()
  }
}
1
You are trying to use writer in a distributed block, looks suspicious to me. I would try map instead of foreach, then you have RDD as a result which you can iterate and read/write. You probably need shuffle stage here anyway, IMO no way to avoid it, HDFS has its own idea how to distribute file.Victor Moroz
After to normalize the file, I could to output it to a list and after to complete the list put it in a HIVE table ?Rodrigo Rondena

1 Answers

1
votes

Not only is the PrintWriter object writer not serializable: Also you can not put the SparkContext (sc) inside of the foreach: it is a construct only for the Driver and does not make sense to send across the wire to the workers.

You should take some time to think about what types of objects make sense to send over the wire. Any pointers / streams / handles do NOT make sense. Structs, Strings, primitives: these do make sense to include in a Closure (or broadcast).