3
votes

I am using Spark streaming on Google Cloud Dataproc for executing a framework (written in Python) which consists of several continuous pipelines, each representing a single job on Dataproc, which basically read from Kafka queues and write the transformed output to Bigtable. All pipelines combined handle several gigabytes of data per day via 2 clusters, one with 3 worker nodes and one with 4.

Running this Spark streaming framework on top of Dataproc has been fairly stable until the beginning of May (3rd of May to be precise): we started experiencing frequent socket timeout exceptions which terminate our pipelines. It doesn't seem to be related to the load on the cluster, as it has not significantly increased. It also happens quite randomly throughout the day and I have checked possibly related code changes but I could not find any. Moreover, this only seems to occur on the cluster with 4 worker nodes, while the pipelines on the cluster with 3 nodes are very similar and experience no timeouts at all. I have already recreated the cluster twice, but the issue remains and it affects all pipelines running on this dataproc cluster. Cluster with 3 nodes is a n1-standard-4 machine type, while the troublesome cluster with 4 nodes is a n1-standard-8 machine type, other then that their configuration is identical.

Example output of a pipeline job execution when the problem occurs and the job terminates:

java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd
    .foreach(lambda *args: None)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach
    self.mapPartitions(processPartition).count()  # Force evaluation
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
timeout: timed out

The start of the stacktrace is in our StreamManager module, method process_kafka_rdd: it processes a single discrete RDD within the direct stream of Kafka messages. Our integration of Kafka with Spark streaming is based upon the "direct approach" described on http://spark.apache.org/docs/latest/streaming-kafka-integration.html

1
what's the number of consumers and partitions that you have by the time of that error?mamdouh alramadan

1 Answers

1
votes

My experience with Spark and socket errors is that some executor has suddenly died. Some other executor communicating with it at the time raises the socket error.

In my experience, the cause of unexpected executor death is hitting some resource paucity, usually a shortage of memory.

(It's important to tune the amount of memory executors can use. The defaults are typically way too low. But I suspect you are already aware of this.)

I assume Spark is running on top of yarn? Unfortunately, in my experience Spark does a poor job reporting the cause of the problem when it occurs down in the guts of yarn. Unfortunately one has to dig into the yarn logs to figure out what actually caused the sudden executor death. The executors each run in a yarn "container;" somewhere in the yarn logs there should be a record of a container falling over.