I am experiencing OOME when I save big data to hdfs
val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
if (row.endsWith(",")) {
accumulableCollection += row
false
} else if (row.length < 100) {
accumulableCollection += row
false
}
valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
var valid = true
for((k,v) <- fieldsMap if valid ) {
if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
accumulableCollection += row.mkString(",")
valid = false
}
}
valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)
I am using this in spark-submit:
--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2
Here's the output of the logs:
15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.
Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) --- (1) what is the 30MB serialized task?
Consider using broadcast variables for large values. --- (2) what should be the broadcast variable? rdd2? or the accumulableCollection since that is what I am writing to HDFS?
When I increased the frameSize, the error is now: java.lang.OutOfMemoryError: Java heap space, so I have to increase the driver-memory and executor-memory to 2G for it to work. if accumulableCollection.value.length is 500,000 I need to use 3G. Is this normal?
The file is only 146MB and contains 200,000 rows (for 2G memory). (in HDFS it is split in 2 partitions, containing 73MB each)