5
votes

I am running into an issue where YARN is killing my containers for exceeding memory limits:

Container killed by YARN for exceeding memory limits. physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

I have 20 nodes that are of m3.2xlarge so they have:

cores: 8
memory: 30
storage: 200 gb ebs

The gist of my application is that I have a couple 100k assets for which I have historical data generated for each hour of the last year, with a total dataset size of 2TB uncompressed. I need to use this historical data to generate a forecast for each asset. My setup is that I first use s3distcp to move the data stored as indexed lzo files to hdfs. I then pull the data in and pass it to sparkSql to handle the json:

 val files = sc.newAPIHadoopFile("hdfs:///local/*",
  classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text],conf)
val lzoRDD = files.map(_._2.toString)
val data = sqlContext.read.json(lzoRDD)

I then use a groupBy to group the historical data by asset, creating a tuple of (assetId,timestamp,sparkSqlRow). I figured this data structure would allow for better in memory operations when generating the forecasts per asset.

 val p = data.map(asset =>  (asset.getAs[String]("assetId"),asset.getAs[Long]("timestamp"),asset)).groupBy(_._1)

I then use a foreach to iterate over each row, calculate the forecast, and finally write the forecast back out as a json file to s3.

 p.foreach{ asset =>
  (1 to dateTimeRange.toStandardHours.getHours).foreach { hour =>
    // determine the hour from the previous year
    val hourFromPreviousYear = (currentHour + hour.hour) - timeRange
    // convert to seconds
    val timeToCompare = hourFromPreviousYear.getMillis
    val al = asset._2.toList

    println(s"Working on asset ${asset._1} for hour $hour with time-to-compare: $timeToCompare")
    // calculate the year over year average for the asset
    val yoy = calculateYOYforAsset2(al, currentHour, asset._1)
    // get the historical data for the asset from the previous year
    val pa = asset._2.filter(_._2 == timeToCompare)
      .map(row => calculateForecast(yoy, row._3, asset._1, (currentHour + hour.hour).getMillis))
      .foreach(json => writeToS3(json, asset._1, (currentHour + hour.hour).getMillis))
  }
}
  • Is there a better way to accomplish this so that I don't hit the memory issue with YARN?
  • Is there a way to chunk the assets so that the foreach only operates on about 10k at a time vs all 200k of the assets?

Any advice/help appreciated!

1
You might a similar issue as in the example I have given in this answer : stackoverflow.com/a/36475604/3415409 - eliasah

1 Answers

0
votes

Its not your code. And don't worry foreach does not run all those lambdas concurrently. The problem is that Spark's default value of spark.yarn.executor.memoryOverhead (or recently renamed in 2.3+ to spark.executor.memoryOverhead) is overly conservative which causes your executors to be killed when under load.

The solution is (as suggested by the error message) to increase that value. I would start with setting it to 1GB (set to 1024) or more if you are requesting lots of memory for each executor. The goal is to get jobs running without any executors being killed.

Alternatively if you control the cluster, you could disable YARN memory enforcement via by setting the configs yarn.nodemanager.pmem-check-enabled and yarn.nodemanager.vmem-check-enabled to false in yarn-site.xml