3
votes

I am experiencing OOME when I save big data to hdfs

val accumulableCollection = sc.accumulableCollection(ArrayBuffer[String]())
val rdd = textfile.filter(row => {
    if (row.endsWith(",")) {
        accumulableCollection += row
        false
    } else if (row.length < 100) {
        accumulableCollection += row
        false
    }
    valid
})
rdd.cache()
val rdd2 = rdd.map(_.split(","))
val rdd3 = rdd2.filter(row => {
    var valid = true
    for((k,v) <- fieldsMap if valid ) {
        if (StringUtils.isBlank(row(k)) || "NULL".equalsIgnoreCase(row(k))) {
            accumulableCollection += row.mkString(",")
            valid = false
        }
    }
    valid
})
sc.parallelize(accumulableCollection.value).saveAsTextFile(hdfsPath)

I am using this in spark-submit:

--num-executors 2 --driver-memory 1G --executor-memory 1G --executor-cores 2

Here's the output of the logs:

15/04/12 18:46:49 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (37528 KB). The maximum recommended task size is 100 KB.
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 8, worker4, PROCESS_LOCAL, 38429279 bytes)
15/04/12 18:46:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 9, worker3, PROCESS_LOCAL, 38456846 bytes)
15/04/12 18:46:50 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 10, worker4, PROCESS_LOCAL, 38426488 bytes)
15/04/12 18:46:51 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 11, worker3, PROCESS_LOCAL, 38445061 bytes)
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Cancelling stage 4
15/04/12 18:46:51 INFO cluster.YarnClusterScheduler: Stage 4 was cancelled
15/04/12 18:46:51 INFO scheduler.DAGScheduler: Job 4 failed: saveAsTextFile at WriteToHdfs.scala:87, took 5.713617 s
15/04/12 18:46:51 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due to stage failure: Serialized task 8:0 was 38617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.)
Exception in thread "Driver" org.apache.spark.SparkException: Job aborted due to stage failure: **Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes)** - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

Serialized task 8:0 was 30617206 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) --- (1) what is the 30MB serialized task?

Consider using broadcast variables for large values. --- (2) what should be the broadcast variable? rdd2? or the accumulableCollection since that is what I am writing to HDFS?

When I increased the frameSize, the error is now: java.lang.OutOfMemoryError: Java heap space, so I have to increase the driver-memory and executor-memory to 2G for it to work. if accumulableCollection.value.length is 500,000 I need to use 3G. Is this normal?

The file is only 146MB and contains 200,000 rows (for 2G memory). (in HDFS it is split in 2 partitions, containing 73MB each)

2
Sometimes this is caused by configuration problems. How are you initializing your spark context? Have you tried submitting this locally? Have you tried running the same code in the interactive shell connected to your cluster/stand-alone manager?Myles Baker
I tried submitting via --master yarn-cluster and local[2] but same results. I am using val sc = new SparkContext(new SparkConf())sophie
Are you putting the entire data set into memory?Myles Baker

2 Answers

1
votes

It means pretty much what it says. You are trying to serialize a single object which is very large. You should probably rewrite your code to not do this.

For example, I am not clear why you are trying to update an accumulable collection, and doing so in a filter, which could even execute many times. You then cache the RDD, but for which you already tried to have a copy on the driver? Then you're adding other values to the local collection then turning that into an RDD again?

Why the accumulableCollection at all? Just operate on RDDs. There is a lot of redundancy here.

4
votes

The central programming abstraction in Spark is an RDD, and you can create them in two ways:

(1) parallelizing an existing collection in your driver program, or (2) referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

The parallelize() method (1) requires that you have your entire dataset in memory on one machine (pg. 26 Learning Spark).

Method (2), referred to as External Datasets, should be used for large files.

The following line creates an RDD using the contents of accumulableCollection.value and requires it to fit on a single machine:

sc.parallelize(accumulableCollection.value)

You might also exceed memory when you cache the RDD:

rdd.cache()

This means the entire textfile RDD is stored in memory. You most likely do not want to do this. See the Spark documentation for advice choosing caching levels for your data.