3
votes

We are currently trying to run a Spark job on a Dataproc cluster using PySpark 2.2.0 except the Spark job stops after a seemingly random amount of time passes with the following error message:

17/07/25 00:52:48 ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:702)

The error could sometimes take only a couple minutes to happen or it could take 3 hours. From personal experience, the Spark job runs for about 30 minutes to 1 hour before hitting the error.

Once the Spark job hits the error, it just stops. No matter how long I wait, it outputs nothing. On YARN ResourceManager, the application status is still labeled as "RUNNING" and I must Ctrl+C to terminate the program. At that point, the application is labelled as "FINISHED".

I run the Spark job using /path/to/spark/bin/spark-submit --jars /path/to/jar/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar spark_job.py command on the master node's console. The JAR file is necessary because the Spark job streams messages from Kafka (running on the same cluster as the Spark job) and pushes some messages back to the same Kafka to a different topic.

I've already looked at some other answers on this site (primarily this and this) and they have been somewhat helpful but we haven't been able to track down where in the log might it state what caused the executors to die. So far, I've monitored the nodes during the task through the YARN ResourceManager as well as gone through the logs located in /var/logs/hadoop-yarn directory in every node. The only "clue" I could find in the log was org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM which is the only line that is written to the dead executor's logs.

As a last ditch effort, we attempted to increase the cluster's memory size in the hopes that the issue will just go away but it hasn't. Originally, the cluster was running on a 1 master 2 workers cluster with 4vCPU, 15GB memory. We created a new Dataproc cluster, this time with 1 master and 3 workers, with the workers each having 8vCPU 52GB memory (master has same specs as previous).

What we would like to know is:
1. Where/how can I see the exception that is causing the executors to be terminated?
2. Is this an issue with how Spark is configured?
3. Dataproc image version is "preview". Could that possibly be the cause of the error?
and ultimately,
4. How do we resolve this issue? What other steps can we take?

This Spark job needs to continuously stream from Kafka for an indefinite amount of time so we would like this error to be fixed rather than prolonging the time it takes for the error to occur.

Here are some screenshots from the YARN ResourceManager to demonstrate what we are seeing:

Cluster Metrics Cluster Metrics

Executor Summary enter image description here

The screenshots are from before the Spark job stopped from the error.

And this is the Spark configuration file located in /path/to/spark/conf/spark-defaults.conf (did not change anything from the default setting by Dataproc):

spark.master yarn
spark.submit.deployMode client
spark.yarn.jars=local:/usr/lib/spark/jars/*
spark.eventLog.enabled true
spark.eventLog.dir hdfs://highmem-m/user/spark/eventlog

# Dynamic allocation on YARN
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.executor.instances 10000
spark.dynamicAllocation.maxExecutors 10000
spark.shuffle.service.enabled true
spark.scheduler.minRegisteredResourcesRatio 0.0

spark.yarn.historyServer.address highmem-m:18080
spark.history.fs.logDirectory hdfs://highmem-m/user/spark/eventlog

spark.executor.cores 2
spark.executor.memory 4655m
spark.yarn.executor.memoryOverhead 465

# Overkill
spark.yarn.am.memory 4655m
spark.yarn.am.memoryOverhead 465

spark.driver.memory 3768m
spark.driver.maxResultSize 1884m
spark.rpc.message.maxSize 512

# Add ALPN for Bigtable
spark.driver.extraJavaOptions 
spark.executor.extraJavaOptions 

# Disable Parquet metadata caching as its URI re-encoding logic does
# not work for GCS URIs (b/28306549). The net effect of this is that
# Parquet metadata will be read both driver side and executor side.
spark.sql.parquet.cacheMetadata=false

# User-supplied properties.
#Mon Jul 24 23:12:12 UTC 2017
spark.executor.cores=4
spark.executor.memory=18619m
spark.driver.memory=3840m
spark.driver.maxResultSize=1920m
spark.yarn.am.memory=640m
spark.executorEnv.PYTHONHASHSEED=0

I'm not quite sure where the User-supplied properties came from.

Edit:
Some additional information about the clusters: I use the zookeeper, kafka, and jupyter initialization action scripts found at https://github.com/GoogleCloudPlatform/dataproc-initialization-actions in the order of zookeeper -> kafka -> jupyter (unfortunately I don't have enough reputation to post more than 2 links at the moment)

Edit 2:
From @Dennis's insightful questions, we ran the Spark job while paying particular attention to the executors that have higher On Heap Storage Memory used. What I noticed is that it is always the executors from worker #0 that have significantly higher storage memory usage compared to the other executors. The stdout file for the executors of worker #0 are always empty. These three lines are repeated many times over in stderr:

17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to

It seems to be repeating every 1~3 seconds.

As for the stdout and stderr for the other executors from other worker nodes, they are empty.

Edit 3:
As mentioned from @Dennis's comments, we kept the Kafka topic the Spark job was consuming from with replication factor of 1. I also found that I've forgotten to add worker #2 to zookeeper.connect in the Kafka config file and also forgot to give the consumer streaming messages from Kafka in Spark a group ID. I've fixed those places (remade topic with replication factor of 3) and observed that now the workload mainly focuses on worker #1. Following the suggestions from @Dennis, I've run sudo jps after SSH-ing to worker #1 and get the following output:

[Removed this section to save character space; it was only the error messages from a failed call to jmap so it didn't hold any useful information]

Edit 4:
I'm now seeing this in worker #1 executors' stdout files:

2017-07-27 22:16:24
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
 PSYoungGen      total 814592K, used 470009K [0x000000063c180000, 0x000000069e600000, 0x00000007c0000000)
  eden space 799744K, 56% used [0x000000063c180000,0x0000000657e53598,0x000000066ce80000)
  from space 14848K, 97% used [0x000000069d780000,0x000000069e5ab1b8,0x000000069e600000)
  to   space 51200K, 0% used [0x0000000698200000,0x0000000698200000,0x000000069b400000)
 ParOldGen       total 574464K, used 180616K [0x0000000334400000, 0x0000000357500000, 0x000000063c180000)
  object space 574464K, 31% used [0x0000000334400000,0x000000033f462240,0x0000000357500000)
 Metaspace       used 49078K, capacity 49874K, committed 50048K, reserved 1093632K
  class space    used 6054K, capacity 6263K, committed 6272K, reserved 1048576K

and

2017-07-27 22:06:44
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
 PSYoungGen      total 608768K, used 547401K [0x000000063c180000, 0x000000066a280000, 0x00000007c0000000)
  eden space 601088K, 89% used [0x000000063c180000,0x000000065d09c498,0x0000000660c80000)
  from space 7680K, 99% used [0x0000000669b00000,0x000000066a2762c8,0x000000066a280000)
  to   space 36864K, 0% used [0x0000000665a80000,0x0000000665a80000,0x0000000667e80000)
 ParOldGen       total 535552K, used 199304K [0x0000000334400000, 0x0000000354f00000, 0x000000063c180000)
  object space 535552K, 37% used [0x0000000334400000,0x00000003406a2340,0x0000000354f00000)
 Metaspace       used 48810K, capacity 49554K, committed 49792K, reserved 1093632K
  class space    used 6054K, capacity 6263K, committed 6272K, reserved 1048576K

When the error happened, an executor from worker #2 received SIGNAL TERM and was labeled as dead. At this time, it was the only dead executor.

Strangely, the Spark job picked back up again after 10 minutes or so. Looking at the Spark UI interface, only executors from worker #1 are active and the rest are dead. First time this has happened.

Edit 5:
Again, following @Dennis's suggestions (thank you, @Dennis!), this time ran sudo -u yarn jmap -histo <pid>. This is the top 10 of the most memory hogging classes from CoarseGrainedExecutorBackend after about 10 minutes:

 num     #instances         #bytes  class name
----------------------------------------------
   1:        244824      358007944  [B
   2:        194242      221184584  [I
   3:       2062554      163729952  [C
   4:        746240       35435976  [Ljava.lang.Object;
   5:           738       24194592  [Lorg.apache.spark.unsafe.memory.MemoryBlock;
   6:        975513       23412312  java.lang.String
   7:        129645       13483080  java.io.ObjectStreamClass
   8:        451343       10832232  java.lang.StringBuilder
   9:         38880       10572504  [Z
  10:        120807        8698104  java.lang.reflect.Field

Also, I've encountered a new type of error which caused an executor to die. It produced some failed tasks highlighted in the Spark UI and found this in the executor's stderr:

17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 6821.0 (TID 2585)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.1 in stage 6821.0 (TID 2586)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.util.Utils: Uncaught exception in thread stdout writer for /opt/conda/bin/python
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
    at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
    at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
    at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/07/28 00:44:03 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/conda/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
    at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
    at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
    at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

Edit 6:
This time, I took the jmap after 40 minutes of running:

 num     #instances         #bytes  class name
----------------------------------------------
   1:         23667      391136256  [B
   2:         25937       15932728  [I
   3:        159174       12750016  [C
   4:           334       10949856  [Lorg.apache.spark.unsafe.memory.MemoryBlock;
   5:         78437        5473992  [Ljava.lang.Object;
   6:        125322        3007728  java.lang.String
   7:         40931        2947032  java.lang.reflect.Field
   8:         63431        2029792  com.esotericsoftware.kryo.Registration
   9:         20897        1337408  com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  10:         20323         975504  java.util.HashMap

These are the results of ps ux:

  USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
yarn       601  0.8  0.9 3008024 528812 ?      Sl   16:12   1:17 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dproc_nodema
yarn      6086  6.3  0.0  96764 24340 ?        R    18:37   0:02 /opt/conda/bin/python -m pyspark.daemon
yarn      8036  8.2  0.0  96296 24136 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8173  9.4  0.0  97108 24444 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8240  9.0  0.0  96984 24576 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8329  7.6  0.0  96948 24720 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8420  8.5  0.0  96240 23788 ?        R    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8487  6.0  0.0  96864 24308 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8554  0.0  0.0  96292 23724 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8564  0.0  0.0  19100  2448 pts/0    R+   18:37   0:00 ps ux
yarn     31705  0.0  0.0  13260  2756 ?        S    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn     31707  0.0  0.0  13272  2876 ?        Ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
yarn     31713  0.4  0.7 2419520 399072 ?      Sl   17:56   0:11 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx6
yarn     31771  0.0  0.0  13260  2740 ?        S    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn     31774  0.0  0.0  13284  2800 ?        Ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
yarn     31780 11.1  1.4 21759016 752132 ?     Sl   17:56   4:31 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx1
yarn     31883  0.1  0.0  96292 27308 ?        S    17:56   0:02 /opt/conda/bin/python -m pyspark.daemon

The pid of the CoarseGrainedExecutorBackEnd is 31780 in this case.

Edit 7:
Increasing heartbeatInterval in the Spark settings did not change anything, which makes sense in hindsight.

I created a short bash script that reads from Kafka with the console consumer for 5 seconds and writes the messages into a text file. The text file is uploaded to Hadoop where Spark streams from. We tested whether the Timeout was related to Kafka through this method.

  • Streaming from Hadoop and outputting to Kafka from Spark caused SocketTimeout
  • Streaming from Kafka directly and not outputting to Kafka from Spark caused SocketTimeout
  • Streaming from Hadoop and not outputting to Kafka from Spark caused SocketTimeout

So we moved on with the assumption that Kafka had nothing to do with the Timeout.

We installed Stackdriver Monitoring to see memory usage as the Timeout occurred. Nothing really interesting from the metrics; memory usage looked relatively stable throughout (hovering around 10~15% at most for the busiest nodes).

We guessed perhaps something to do with the communication between the worker nodes is what could be causing the issue. Right now, our amount of data traffic is very low so even one worker can handle all the workload with relative ease.

Running the Spark job on a single node cluster while streaming from Kafka brokers from a different cluster seemed to have stopped the SocketTimeout... except the AssertionError documented above now frequently occurs.

Per @Dennis's suggestion, I created a new cluster (also single node) without the jupyter initialization script this time which means Spark runs on Python v2.7.9 now (without Anaconda). The first run, Spark encountered SocketTimeoutException in just 15 seconds. The second time ran for just over 2 hours, failing with the same AssertionError. I'm starting to wonder if this is a problem with Spark's internals. The third run ran for about 40 minutes and then ran into SocketTimeoutException.

1
Did using higher-memory workers increase the time it took before the error happens each time?Dennis Huo
@DennisHuo As far as I can tell, increasing memory doesn't seem like it has changed how long it takes for the error to occuruser8366430
Did you look through the "stdout", "stderr" and Thread Dump links that Spark provides for any additional clues? Especially for ones that have "On Heap Storage Memory" creep up above all the others. Is the memory creep mostly linear? What's the actual amount of time before hitting the error each time?Dennis Huo
@DennisHuo Thank you for patience. I've added some additional info regarding stdout/stderr in Edit 2. As for memory creep, yes it has been mostly linear. About actual amount of time before error, as I've mentioned, it's been very sporadic so it is hard to give a solid answer. The most recent run has taken 15 minutes 3 seconds to hit the error. The run before that took 1 hour and 41 seconds.user8366430
The fact that it's two separate executors but both on worker 0 which has the high storage implies to me that you might have some skew in your Kafka topics. What's the source of data going into Kafka? Do you have Kafka replication factor set > 1 for the topic(s) you're using? Also, if you have it running, you should be able to SSH into the worker node with higher memory usage, and if you type sudo jps you should see something that looks like a Spark executor (I forget exactly, maybe CoarseGrainedExecutorBackend). Either way, find the java process using memory, then sudo jmap -histo <pid>Dennis Huo

1 Answers

2
votes

A client of mine was seeing various production Pyspark jobs (Spark version 2.2.1) fail in Google Cloud Dataproc intermittently with a very similar stack trace to yours:

ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
    java.net.SocketTimeoutException: Accept timed out
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:711)

I found that disabling ipv6 on the Dataproc cluster VMs seemed to fix the issue. One way to do that is adding these lines to a Dataproc init script so they are run at cluster creation time:

printf "\nnet.ipv6.conf.default.disable_ipv6 = 1\nnet.ipv6.conf.all.disable_ipv6=1\n" >> /etc/sysctl.conf
sysctl -p