0
votes

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large. Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller.

I think it this way, please correct me if I am wrong.

Suppose there are 2 Data Nodes to process the Dataset and both these nodes collectively has a memory of 32GB(16 GB per Data Node). The data set size is 100 GB and let us suppose this data, when read by spark, is partitioned into 10 partitions of 10GB each. It is obvious that the 100GB file cannot be fit into 32 GB RAM at a time. so the partitions have to be loaded into memory and processed in a iterative manner. so I assume as below.

first iteration, 2 partitions, 10GB each are loaded into memory on each data node. second iteration, 2 partitions, 10GB each are loaded into memory on each data node. .... .... Fifth iteration, 2 partitions, 10GB each are loaded into memory on each data node.

If this is how the spark is processing, during every iteration, only 2 partitions are loaded into memory. Does that mean, the other partitions which were unable to be accommodated in memory, were read but spilled to disk and they are waiting for the memory to be freed? or those partitions are not read at all and they will be read only when the resources are available. which is true?

During processing if there is a need to groupby/reduceby/join, then it mandates a shuffle. so if one of the shuffle partition is greater than RAM size then the job will fail with OOM error. Example, 10 partitions were processed and shuffled. Now the shuffle partitions are only 4 partitions with 25GB each. (Default shuffle partitions are 200, but only 4 partitions have the total data remaining are empty.) since the shuffle partition size is greater than 16MB RAM, will the spark job fail? Is my understanding correct?

I understand that, you do not really need that your data fit in memory. Spark processes the data on partition basis. But My question is what if the partition itself is not fitting in memory. Would it still spill the data to disk and start processing or it will fail with OOM error?

The second question I have is, If another spark job(Job2) is triggered during the above spark job(job1) is under execution, and suppose this is also having 100GB file to process with 10 partitions of 10GB each. so when job1 Iteration1 is under execution, there is only 6 MB free slot available in the memory. The job2's partition of 10GB cannot be loaded into memory for processing job2. so will the Job2 wait till the memory is freed up? or will this job also fail with OOM error?

1
Unprofessional title, I will therefore change itthebluephantom
answer acceptance?thebluephantom

1 Answers

1
votes

The explanation (bold) is correct.

On your comments:

  • Unless you explicitly repartition, your partitions will be HDFS block size related, the 128MB size and as many that make up that file.

  • Then you have number of executors, say 2, per Worker / Data Node. Then max 4 tasks / partitions will be active at any given time.

    • What would be the point of loading all partitions to memory if you can service at most 4? You would be clogging up the system to the detriment of other Spark Apps. This is all like a normal OS then.

Of course it is a bit more complicated, e.g. if you have 10 Data Nodes and allocation only 2 Executors, there is traffic to move stuff about. Just keeping it simple.

OOM errors only occur if a partition exceeds max partition size. For the rest disk space is needed for spilling.