We are currently trying to run a Spark job on a Dataproc cluster using PySpark 2.2.0 except the Spark job stops after a seemingly random amount of time passes with the following error message:
17/07/25 00:52:48 ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:702)
The error could sometimes take only a couple minutes to happen or it could take 3 hours. From personal experience, the Spark job runs for about 30 minutes to 1 hour before hitting the error.
Once the Spark job hits the error, it just stops. No matter how long I wait, it outputs nothing. On YARN ResourceManager, the application status is still labeled as "RUNNING" and I must Ctrl+C to terminate the program. At that point, the application is labelled as "FINISHED".
I run the Spark job using /path/to/spark/bin/spark-submit --jars /path/to/jar/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar spark_job.py
command on the master node's console. The JAR file is necessary because the Spark job streams messages from Kafka (running on the same cluster as the Spark job) and pushes some messages back to the same Kafka to a different topic.
I've already looked at some other answers on this site (primarily this and this) and they have been somewhat helpful but we haven't been able to track down where in the log might it state what caused the executors to die. So far, I've monitored the nodes during the task through the YARN ResourceManager as well as gone through the logs located in /var/logs/hadoop-yarn
directory in every node. The only "clue" I could find in the log was org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
which is the only line that is written to the dead executor's logs.
As a last ditch effort, we attempted to increase the cluster's memory size in the hopes that the issue will just go away but it hasn't. Originally, the cluster was running on a 1 master 2 workers cluster with 4vCPU, 15GB memory. We created a new Dataproc cluster, this time with 1 master and 3 workers, with the workers each having 8vCPU 52GB memory (master has same specs as previous).
What we would like to know is:
1. Where/how can I see the exception that is causing the executors to be terminated?
2. Is this an issue with how Spark is configured?
3. Dataproc image version is "preview". Could that possibly be the cause of the error?
and ultimately,
4. How do we resolve this issue? What other steps can we take?
This Spark job needs to continuously stream from Kafka for an indefinite amount of time so we would like this error to be fixed rather than prolonging the time it takes for the error to occur.
Here are some screenshots from the YARN ResourceManager to demonstrate what we are seeing:
The screenshots are from before the Spark job stopped from the error.
And this is the Spark configuration file located in /path/to/spark/conf/spark-defaults.conf
(did not change anything from the default setting by Dataproc):
spark.master yarn
spark.submit.deployMode client
spark.yarn.jars=local:/usr/lib/spark/jars/*
spark.eventLog.enabled true
spark.eventLog.dir hdfs://highmem-m/user/spark/eventlog
# Dynamic allocation on YARN
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.executor.instances 10000
spark.dynamicAllocation.maxExecutors 10000
spark.shuffle.service.enabled true
spark.scheduler.minRegisteredResourcesRatio 0.0
spark.yarn.historyServer.address highmem-m:18080
spark.history.fs.logDirectory hdfs://highmem-m/user/spark/eventlog
spark.executor.cores 2
spark.executor.memory 4655m
spark.yarn.executor.memoryOverhead 465
# Overkill
spark.yarn.am.memory 4655m
spark.yarn.am.memoryOverhead 465
spark.driver.memory 3768m
spark.driver.maxResultSize 1884m
spark.rpc.message.maxSize 512
# Add ALPN for Bigtable
spark.driver.extraJavaOptions
spark.executor.extraJavaOptions
# Disable Parquet metadata caching as its URI re-encoding logic does
# not work for GCS URIs (b/28306549). The net effect of this is that
# Parquet metadata will be read both driver side and executor side.
spark.sql.parquet.cacheMetadata=false
# User-supplied properties.
#Mon Jul 24 23:12:12 UTC 2017
spark.executor.cores=4
spark.executor.memory=18619m
spark.driver.memory=3840m
spark.driver.maxResultSize=1920m
spark.yarn.am.memory=640m
spark.executorEnv.PYTHONHASHSEED=0
I'm not quite sure where the User-supplied properties
came from.
Edit:
Some additional information about the clusters:
I use the zookeeper, kafka, and jupyter initialization action scripts found at https://github.com/GoogleCloudPlatform/dataproc-initialization-actions
in the order of zookeeper -> kafka -> jupyter (unfortunately I don't have enough reputation to post more than 2 links at the moment)
Edit 2:
From @Dennis's insightful questions, we ran the Spark job while paying particular attention to the executors that have higher On Heap Storage Memory used. What I noticed is that it is always the executors from worker #0 that have significantly higher storage memory usage compared to the other executors. The stdout file for the executors of worker #0 are always empty. These three lines are repeated many times over in stderr:
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to
It seems to be repeating every 1~3 seconds.
As for the stdout and stderr for the other executors from other worker nodes, they are empty.
Edit 3:
As mentioned from @Dennis's comments, we kept the Kafka topic the Spark job was consuming from with replication factor of 1. I also found that I've forgotten to add worker #2 to zookeeper.connect in the Kafka config file and also forgot to give the consumer streaming messages from Kafka in Spark a group ID. I've fixed those places (remade topic with replication factor of 3) and observed that now the workload mainly focuses on worker #1. Following the suggestions from @Dennis, I've run sudo jps
after SSH-ing to worker #1 and get the following output:
[Removed this section to save character space; it was only the error messages from a failed call to jmap so it didn't hold any useful information]
Edit 4:
I'm now seeing this in worker #1 executors' stdout files:
2017-07-27 22:16:24
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
PSYoungGen total 814592K, used 470009K [0x000000063c180000, 0x000000069e600000, 0x00000007c0000000)
eden space 799744K, 56% used [0x000000063c180000,0x0000000657e53598,0x000000066ce80000)
from space 14848K, 97% used [0x000000069d780000,0x000000069e5ab1b8,0x000000069e600000)
to space 51200K, 0% used [0x0000000698200000,0x0000000698200000,0x000000069b400000)
ParOldGen total 574464K, used 180616K [0x0000000334400000, 0x0000000357500000, 0x000000063c180000)
object space 574464K, 31% used [0x0000000334400000,0x000000033f462240,0x0000000357500000)
Metaspace used 49078K, capacity 49874K, committed 50048K, reserved 1093632K
class space used 6054K, capacity 6263K, committed 6272K, reserved 1048576K
and
2017-07-27 22:06:44
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
PSYoungGen total 608768K, used 547401K [0x000000063c180000, 0x000000066a280000, 0x00000007c0000000)
eden space 601088K, 89% used [0x000000063c180000,0x000000065d09c498,0x0000000660c80000)
from space 7680K, 99% used [0x0000000669b00000,0x000000066a2762c8,0x000000066a280000)
to space 36864K, 0% used [0x0000000665a80000,0x0000000665a80000,0x0000000667e80000)
ParOldGen total 535552K, used 199304K [0x0000000334400000, 0x0000000354f00000, 0x000000063c180000)
object space 535552K, 37% used [0x0000000334400000,0x00000003406a2340,0x0000000354f00000)
Metaspace used 48810K, capacity 49554K, committed 49792K, reserved 1093632K
class space used 6054K, capacity 6263K, committed 6272K, reserved 1048576K
When the error happened, an executor from worker #2 received SIGNAL TERM
and was labeled as dead. At this time, it was the only dead executor.
Strangely, the Spark job picked back up again after 10 minutes or so. Looking at the Spark UI interface, only executors from worker #1 are active and the rest are dead. First time this has happened.
Edit 5:
Again, following @Dennis's suggestions (thank you, @Dennis!), this time ran sudo -u yarn jmap -histo <pid>
. This is the top 10 of the most memory hogging classes from CoarseGrainedExecutorBackend
after about 10 minutes:
num #instances #bytes class name
----------------------------------------------
1: 244824 358007944 [B
2: 194242 221184584 [I
3: 2062554 163729952 [C
4: 746240 35435976 [Ljava.lang.Object;
5: 738 24194592 [Lorg.apache.spark.unsafe.memory.MemoryBlock;
6: 975513 23412312 java.lang.String
7: 129645 13483080 java.io.ObjectStreamClass
8: 451343 10832232 java.lang.StringBuilder
9: 38880 10572504 [Z
10: 120807 8698104 java.lang.reflect.Field
Also, I've encountered a new type of error which caused an executor to die. It produced some failed tasks highlighted in the Spark UI and found this in the executor's stderr:
17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 6821.0 (TID 2585)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.1 in stage 6821.0 (TID 2586)
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.util.Utils: Uncaught exception in thread stdout writer for /opt/conda/bin/python
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/07/28 00:44:03 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/conda/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
at scala.Predef$.assert(Predef.scala:170)
at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Edit 6:
This time, I took the jmap
after 40 minutes of running:
num #instances #bytes class name
----------------------------------------------
1: 23667 391136256 [B
2: 25937 15932728 [I
3: 159174 12750016 [C
4: 334 10949856 [Lorg.apache.spark.unsafe.memory.MemoryBlock;
5: 78437 5473992 [Ljava.lang.Object;
6: 125322 3007728 java.lang.String
7: 40931 2947032 java.lang.reflect.Field
8: 63431 2029792 com.esotericsoftware.kryo.Registration
9: 20897 1337408 com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
10: 20323 975504 java.util.HashMap
These are the results of ps ux
:
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
yarn 601 0.8 0.9 3008024 528812 ? Sl 16:12 1:17 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dproc_nodema
yarn 6086 6.3 0.0 96764 24340 ? R 18:37 0:02 /opt/conda/bin/python -m pyspark.daemon
yarn 8036 8.2 0.0 96296 24136 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8173 9.4 0.0 97108 24444 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8240 9.0 0.0 96984 24576 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8329 7.6 0.0 96948 24720 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8420 8.5 0.0 96240 23788 ? R 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8487 6.0 0.0 96864 24308 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8554 0.0 0.0 96292 23724 ? S 18:37 0:00 /opt/conda/bin/python -m pyspark.daemon
yarn 8564 0.0 0.0 19100 2448 pts/0 R+ 18:37 0:00 ps ux
yarn 31705 0.0 0.0 13260 2756 ? S 17:56 0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn 31707 0.0 0.0 13272 2876 ? Ss 17:56 0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java
yarn 31713 0.4 0.7 2419520 399072 ? Sl 17:56 0:11 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx6
yarn 31771 0.0 0.0 13260 2740 ? S 17:56 0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn 31774 0.0 0.0 13284 2800 ? Ss 17:56 0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java
yarn 31780 11.1 1.4 21759016 752132 ? Sl 17:56 4:31 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx1
yarn 31883 0.1 0.0 96292 27308 ? S 17:56 0:02 /opt/conda/bin/python -m pyspark.daemon
The pid
of the CoarseGrainedExecutorBackEnd
is 31780
in this case.
Edit 7:
Increasing heartbeatInterval
in the Spark settings did not change anything, which makes sense in hindsight.
I created a short bash script that reads from Kafka with the console consumer for 5 seconds and writes the messages into a text file. The text file is uploaded to Hadoop where Spark streams from. We tested whether the Timeout was related to Kafka through this method.
- Streaming from Hadoop and outputting to Kafka from Spark caused SocketTimeout
- Streaming from Kafka directly and not outputting to Kafka from Spark caused SocketTimeout
- Streaming from Hadoop and not outputting to Kafka from Spark caused SocketTimeout
So we moved on with the assumption that Kafka had nothing to do with the Timeout.
We installed Stackdriver Monitoring to see memory usage as the Timeout occurred. Nothing really interesting from the metrics; memory usage looked relatively stable throughout (hovering around 10~15% at most for the busiest nodes).
We guessed perhaps something to do with the communication between the worker nodes is what could be causing the issue. Right now, our amount of data traffic is very low so even one worker can handle all the workload with relative ease.
Running the Spark job on a single node cluster while streaming from Kafka brokers from a different cluster seemed to have stopped the SocketTimeout... except the AssertionError
documented above now frequently occurs.
Per @Dennis's suggestion, I created a new cluster (also single node) without the jupyter initialization script this time which means Spark runs on Python v2.7.9 now (without Anaconda). The first run, Spark encountered SocketTimeoutException
in just 15 seconds. The second time ran for just over 2 hours, failing with the same AssertionError
. I'm starting to wonder if this is a problem with Spark's internals. The third run ran for about 40 minutes and then ran into SocketTimeoutException
.
sudo jps
you should see something that looks like a Spark executor (I forget exactly, maybe CoarseGrainedExecutorBackend). Either way, find the java process using memory, thensudo jmap -histo <pid>
– Dennis Huo