0
votes

I am trying to fit a ml model in Spark (2.0.0) on a Google DataProc Cluster. When fitting the model I receive an Executor heartbeat timed out error. How can I resolve this?

Other solutions indicate this is probably due to Out of Memory of (one of) the executors. I read as solutions: Set the right setting, repartition, cache, and get a bigger cluster. What can I do, preferably without setting up a larger cluster? (Make more/less partitions? Cache less? Adjust settings?)

My setting:

Spark 2.0.0 on a Google DataProc Cluster: 1 Master and 2 workers all with the same specs: n1-highmem-8 -> 8 vCPUs, 52.0 GB memory - 500GB disk

Settings:

spark\:spark.executor.cores=1
distcp\:mapreduce.map.java.opts=-Xmx2457m
spark\:spark.driver.maxResultSize=1920m
mapred\:mapreduce.map.java.opts=-Xmx2457m
yarn\:yarn.nodemanager.resource.memory-mb=6144
mapred\:mapreduce.reduce.memory.mb=6144
spark\:spark.yarn.executor.memoryOverhead=384
mapred\:mapreduce.map.cpu.vcores=1
distcp\:mapreduce.reduce.memory.mb=6144
mapred\:yarn.app.mapreduce.am.resource.mb=6144
mapred\:mapreduce.reduce.java.opts=-Xmx4915m
yarn\:yarn.scheduler.maximum-allocation-mb=6144
dataproc\:dataproc.scheduler.max-concurrent-jobs=11
dataproc\:dataproc.heartbeat.master.frequency.sec=30
mapred\:mapreduce.reduce.cpu.vcores=2
distcp\:mapreduce.reduce.java.opts=-Xmx4915m
distcp\:mapreduce.map.memory.mb=3072
spark\:spark.driver.memory=3840m
mapred\:mapreduce.map.memory.mb=3072
yarn\:yarn.scheduler.minimum-allocation-mb=512
mapred\:yarn.app.mapreduce.am.resource.cpu-vcores=2
spark\:spark.yarn.am.memoryOverhead=384
spark\:spark.executor.memory=2688m
spark\:spark.yarn.am.memory=2688m
mapred\:yarn.app.mapreduce.am.command-opts=-Xmx4915m

Full Error:

Py4JJavaError: An error occurred while calling o4973.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 151 in stage 16964.0 failed 4 times, most recent failure: Lost task 151.3 in stage 16964.0 (TID 779444, reco-test-w-0.c.datasetredouteasvendor.internal): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175122 ms Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:892) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:372) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:372) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:371) at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1156) at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.countByValue(RDD.scala:1155) at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:91) at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745)

1
Are you explicitly setting spark:spark.executor.cores and spark:spark.executor.memory yourself? By default Dataproc will have more than one core per executor, and it also looks like spark:spark.executor.memory is different from Dataproc's default computed value for n1-highmem-8.Dennis Huo
Anyhow, the easiest way to get more memory per unit of work is to just adjust spark.executor.memory; you can even do this at job-submission time without rebuilding your cluster; if using spark-shell or pyspark from the command line instead of Dataproc job submission, you run pyspark --conf spark.executor.memory=5376m for example. You should be able to crank that number up, all the way until you hit the entire size of a single machine approximately; with larger per-executor memory sizes you'll have fewer executors, though, so might leave a few cores unused using larger memory settings.Dennis Huo
You are right. The standards for a n1-highmem-8 cluster are spark.executor.memory=18619m and spark.executor.cores=4. As I have 8 core workers with 52GB memory, can I set spark.executor.memory=50000m and spark.executor.cores=8? Is this to high?Stijn
A fraction of memory is reserved for various system daemons and overhead, HDFS, NodeManagers, etc., so you can look at yarn:yarn.scheduler.maximum-allocation-mb in your cluster description (gcloud dataproc clusters describe <cluster-name>) to find out the max YARN allocation, and subtract spark:spark.yarn.executor.memoryOverhead to find out how much you can request in spark.executor.memory. Note that the memory per executor core is what matters for OOM problems, so setting spark.executor.memory=40000m and spark.executor.cores=8 will have the same task limits as 5000m/cores=1Dennis Huo
Have you tried using the default settings yet? The default settings should be providing more memory headroom than the settings you posted in the question already. To get to the bottom of the possible OOM, the easiest way is actually just to reduce spark.executor.cores while leaving spark.executor.memory unchanged; default has the 18619m shared between 4 cores; if you set to 3 cores, then it's only split between 3 cores; you can do this until you find the necessary balance of memory and cores and then potentially adjust it down to single-core executors as desired afterwards.Dennis Huo

1 Answers

1
votes

As this question doesn't have an answer, to summarize the issue appears to have been related to spark.executor.memory being set too low, causing occasional out-of-memory errors on an executor.

The suggested fix was to first try starting with the default Dataproc config, which tries to fully use all cores and memory available on the instance. If issues continue, then adjust spark.executor.memory and spark.executor.cores to increase the amount of memory available per task (essentially spark.executor.memory / spark.executor.cores).

Dennis also gives more details about the Spark memory config on Dataproc in the following answer:
Google Cloud Dataproc configuration issues