3
votes

My spark cluster hangs when I try to cache() or persist(MEMORY_ONLY_SER()) my RDDs. It works great and computes results in about 7min. if I don't use cache().

I've got 6 c3.xlarge EC2 instances (4 cores, 7.5 GB RAM each), which gives in total 24 cores and 37.7 GB.

I run my application with the following command on master:

SPARK_MEM=5g MEMORY_FRACTION="0.6" SPARK_HOME="/root/spark" java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis

The data set is about 50GB of data partitioned into 24 files. I compressed it and stored in S3 bucket in 24 files (where each of it has size of 7MB to 300MB).

I absolutely can't find a reason for such behaviour of my cluster, but it seems, like spark consumed all available memory and got into GC collecting loop. When I look into gc verbose, I can find a cycles like below:

[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]

This finally leads to the messages like:

WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms

...and stops any progress in computing. This looks like the memory was consumed in 100%, but I tried to use machines with more RAM (like 30GB each), and the effect is the same.

What might be the reason of such behaviour?? Could anybody help??

2
your app is continuously performing GC with very less memory recovery, something is configured/coded wrongly that is making it fill memory - jmj
yes - this is what I can see from logs too. the question is really why it happens. - Bartek
@Bartek it is really hard to tell without actual code.. But, 50 GB of files is a lot. If you load all of them at one and try to process of course you will have serious memory pressures. - Eugene
@Eugene Thanks, for your response. As far as I understood, Apache-Spark should deal with such amounts of data and cache part of the data on disk or re-read it again in case of memory problems. I didn't put the code, as I don't suppose a bug in my code, which 1) is very simple; 2) works OK if I don't cache/persist. I may paste it if you find it useful? - Bartek
Try to put in cache a few data, let's say a sample of 1million, look on Spark web UI how memory is consumed, then you should find the total memory you need. - Thomas Decaux

2 Answers

5
votes

Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster).

By default I think your code will use 24 partitions, but for 50 GB of data that is far too little. I'd try a few 100 partitions at least.

Next you use SPARK_MEM=5g but say each node has 7.5 GB, so you might as well have SPARK_MEM=7500m.

You could also try increasing the memory fraction, but I think the above is more likely to help.

General points: use HDFS for you files not s3, it's hugely faster. Ensure you munge your data properly before caching it - e.g. if you have say TSV data with 100 columns, but you only use 10 of the fields, then make sure you've extracted those fields before you try to cache.

4
votes

There is a big difference between 'raw' caching and 'serialized' caching

  1. Raw Caching : (rdd.cache() or rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) )

    This will consume 2x-3x the memory. For example a 100MB rdd can consume 350MB in memory

  2. serialized caching (rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER))

    This consumes pretty much the same amount of memory plus some small overhead. For example 100MB data will consume 100MB + few KB in memory.

How ever raw caching is faster during operations. Serialized cache takes longer (because the object has to be de-serialized before compute)

Here is a interesting result from my experiment.

enter image description hereenter image description here