2
votes

I am triying to multiclass classification in spark. I have 300.000 predefined classification set. There is not any problem while transforming data but when i try to train model i am getting Out Of Memory. How can i solve this problem?

object Test {

  var num = 50
  var savePath = "c:/Temp/SparkModel/"
  var stemmer = Resha.Instance

  var STOP_WORDS: Set[String] = Set()

  def cropSentence(s: String) = {
    s.replaceAll("\\([^\\)]*\\)", "")
      .replaceAll(" - ", " ")
      .replaceAll("-", " ")
      .replaceAll("  +", " ")
      .replaceAll(",", " ").trim()
  }

  def main(args: Array[String]): Unit = {

    val sc = new SparkConf().setAppName("Test").setMaster("local[*]")
      .set("spark.sql.warehouse.dir", "D:/Temp/wh")
      .set("spark.executor.memory", "12g")
      .set("spark.driver.memory", "4g")
      .set("spark.hadoop.validateOutputSpecs", "false")

    val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate()
    import spark.implicits._

    val mainDataset = spark.sparkContext.textFile("file:///C:/Temp/classifications.csv")
      .map( _.split(";"))
      .map(tokens => {      
         var list=new ListBuffer[String]()
      var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));      
      token0.split("\\s+").map {list+=stemmer.stem(_)}   
      (tokens(1), tokens(0),list.toList.mkString(" "))
      }).toDF("className","productNameOrg","productName")


    val classIndexer = new StringIndexer()
      .setInputCol("className")
      .setOutputCol("label")

    val classIndexerModel = classIndexer.fit(mainDataset)
    var mainDS=classIndexerModel.transform(mainDataset)
    classIndexerModel.write.overwrite.save(savePath + "ClassIndexer")

    //Tokenizer
              val tokenizer = new Tokenizer()                                
                           .setInputCol("productName")                     
                           .setOutputCol("words_nonfiltered")
    //StopWords
              val remover = new StopWordsRemover()
                             .setInputCol("words_nonfiltered")
                             .setOutputCol("words")
                             .setStopWords( Array[String]("stop1","stop2","stop3"))
    //CountVectorize

              val countVectorizer = new CountVectorizer()
                             .setInputCol("words")
                             .setOutputCol("features")

              val  rfc = new RandomForestClassifier ()                          
                      .setLabelCol("label")
                      .setNumTrees(50)
                      .setMaxDepth(15)
                      .setFeatureSubsetStrategy("auto")
                      .setFeaturesCol("features")
                      .setImpurity("gini")
                      .setMaxBins(32)


           val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc))
           val train =mainDS
           val model = pipeline.fit(train) <============= OOM
           model.write.overwrite.save(savePath+"RandomForestClassifier")

  }
}

Error:

16/10/21 00:54:23 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.9 GB to disk (1 time so far)
16/10/21 00:56:58 INFO ExternalAppendOnlyMap: Thread 98 spilling in-memory map of 2.7 GB to disk (2 times so far)
16/10/21 00:57:05 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.7 GB to disk (2 times so far)
Exception in thread "shuffle-server-0" java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:37 WARN SingleThreadEventExecutor: Unexpected exception from an event executor: 
java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:43 WARN TaskMemoryManager: leak 1575.8 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@18f269e7
16/10/21 01:02:42 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:176)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249)
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172)
    at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67)
16/10/21 01:02:37 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 126580 ms exceeds timeout 120000 ms
16/10/21 01:03:56 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 126580 ms
16/10/21 01:03:58 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 25)
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Executor task launch worker-4" java.lang.OutOfMemoryError: Java heap space
16/10/21 01:06:00 WARN TaskSetManager: Lost task 1.0 in stage 12.0 (TID 26, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 126580 ms
16/10/21 01:06:00 ERROR TaskSetManager: Task 1 in stage 12.0 failed 1 times; aborting job
1
Are you running it in Cluster mode or as Client Mode ? - Shivansh
Running in Client Mode with 8 core. - kkurt

1 Answers

1
votes

It usually occurs when your driver memory is not fine tuned.

What you are doing wrong here is that you are passing 4g as driver memory and setting it from the Spark Conf , but as the documentation states that , it won't work in client mode , hence you have to pass this explicity while submitting the application.

Take a look here for the configuration : https://spark.apache.org/docs/1.6.1/configuration.html#available-properties