I am trying to fit a ml model in Spark (2.0.0) on a Google DataProc Cluster. When fitting the model I receive an Executor heartbeat timed out error. How can I resolve this?
Other solutions indicate this is probably due to Out of Memory of (one of) the executors. I read as solutions: Set the right setting, repartition, cache, and get a bigger cluster. What can I do, preferably without setting up a larger cluster? (Make more/less partitions? Cache less? Adjust settings?)
My setting:
Spark 2.0.0 on a Google DataProc Cluster: 1 Master and 2 workers all with the same specs: n1-highmem-8 -> 8 vCPUs, 52.0 GB memory - 500GB disk
Settings:
spark\:spark.executor.cores=1
distcp\:mapreduce.map.java.opts=-Xmx2457m
spark\:spark.driver.maxResultSize=1920m
mapred\:mapreduce.map.java.opts=-Xmx2457m
yarn\:yarn.nodemanager.resource.memory-mb=6144
mapred\:mapreduce.reduce.memory.mb=6144
spark\:spark.yarn.executor.memoryOverhead=384
mapred\:mapreduce.map.cpu.vcores=1
distcp\:mapreduce.reduce.memory.mb=6144
mapred\:yarn.app.mapreduce.am.resource.mb=6144
mapred\:mapreduce.reduce.java.opts=-Xmx4915m
yarn\:yarn.scheduler.maximum-allocation-mb=6144
dataproc\:dataproc.scheduler.max-concurrent-jobs=11
dataproc\:dataproc.heartbeat.master.frequency.sec=30
mapred\:mapreduce.reduce.cpu.vcores=2
distcp\:mapreduce.reduce.java.opts=-Xmx4915m
distcp\:mapreduce.map.memory.mb=3072
spark\:spark.driver.memory=3840m
mapred\:mapreduce.map.memory.mb=3072
yarn\:yarn.scheduler.minimum-allocation-mb=512
mapred\:yarn.app.mapreduce.am.resource.cpu-vcores=2
spark\:spark.yarn.am.memoryOverhead=384
spark\:spark.executor.memory=2688m
spark\:spark.yarn.am.memory=2688m
mapred\:yarn.app.mapreduce.am.command-opts=-Xmx4915m
Full Error:
Py4JJavaError: An error occurred while calling o4973.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 151 in stage 16964.0 failed 4 times, most recent failure: Lost task 151.3 in stage 16964.0 (TID 779444, reco-test-w-0.c.datasetredouteasvendor.internal): ExecutorLostFailure (executor 14 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175122 ms Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.collect(RDD.scala:892) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:372) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:372) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:371) at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1156) at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.countByValue(RDD.scala:1155) at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:91) at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745)
spark:spark.executor.cores
andspark:spark.executor.memory
yourself? By default Dataproc will have more than one core per executor, and it also looks likespark:spark.executor.memory
is different from Dataproc's default computed value for n1-highmem-8. – Dennis Huospark.executor.memory
; you can even do this at job-submission time without rebuilding your cluster; if usingspark-shell
orpyspark
from the command line instead of Dataproc job submission, you runpyspark --conf spark.executor.memory=5376m
for example. You should be able to crank that number up, all the way until you hit the entire size of a single machine approximately; with larger per-executor memory sizes you'll have fewer executors, though, so might leave a few cores unused using larger memory settings. – Dennis Huoyarn:yarn.scheduler.maximum-allocation-mb
in your cluster description (gcloud dataproc clusters describe <cluster-name>
) to find out the max YARN allocation, and subtractspark:spark.yarn.executor.memoryOverhead
to find out how much you can request inspark.executor.memory
. Note that the memory per executor core is what matters for OOM problems, so setting spark.executor.memory=40000m and spark.executor.cores=8 will have the same task limits as 5000m/cores=1 – Dennis Huospark.executor.cores
while leavingspark.executor.memory
unchanged; default has the 18619m shared between 4 cores; if you set to 3 cores, then it's only split between 3 cores; you can do this until you find the necessary balance of memory and cores and then potentially adjust it down to single-core executors as desired afterwards. – Dennis Huo