1
votes

I have pySpark Job (spark 2.4.1) that seems to be working fine in 10% of the cases, and the other times seemingly stuck forever on a single task, where I can't really understand what is happening. Here's what I'm doing in my pyspark code:

df = ss.read.parquet(...)
df2 = df.withColumn("A", my_python_udf(sf.col("position.latitude"))
print(df2.groupBy(sf.spark_partition_id()).count().agg(sf.min("count"), sf.max("count"), sf.avg("count")).toPandas())

I seem to be forever stuck in the evaluation of the "toPandas" call. When I check the executors tab, only one executor is runnable with the following call stack:

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) java.io.BufferedInputStream.read1(BufferedInputStream.java:284) java.io.BufferedInputStream.read(BufferedInputStream.java:345) => holding Monitor(java.io.BufferedInputStream@1118259716}) java.io.DataInputStream.readFully(DataInputStream.java:195) java.io.DataInputStream.readFully(DataInputStream.java:169) org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:74) org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) org.apache.spark.scheduler.Task.run(Task.scala:121) org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

I have several questions:

  1. Why does it seems like the call stack is doing stuff related to the UDF evaluatin, which is not needed for my computation?
  2. What is actually going on? I'm not sure if that thread is deadlocked or live from the call stack
  3. How to fix this?

edit : I also have 2 executors that are failing with the following error:

java.io.IOException: expected more bytes in input stream at net.razorvine.pickle.PickleUtils.readbytes_into(PickleUtils.java:75) at net.razorvine.pickle.PickleUtils.readbytes(PickleUtils.java:55) at net.razorvine.pickle.Unpickler.load_binunicode(Unpickler.java:473) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:190) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:90) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:89) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Which makes me believe that something outside of my code is going wrong

1
Did you manage to find out what is going on? :(Gooseman

1 Answers

1
votes

In my case, my PySpark jobs always (not randomly) got stuck in PythonUDFRunner waiting for data.

I found out it is waiting for data coming from a PySpark daemon which is used for launching python code (Python UDFs) without wasting loads of memory while forking a Python process (UDF) from a Java process (Spark) More information in this link: pyspark daemon

So, roughly speaking, the Python UDF is serialized by Spark and sent to this daemon which is in charge of running the Python code.

This daemon is run by means of Python code located in a file called pyspark.zip

In my case, for reasons that I prefer not to tell ;) ;) the pyspark.zip was coming from Spark 2.3.3 but Spark itself was running with Spark 2.4.5. I replaced pyspark.zip with the one coming from Spark 2.4.5 and everything started to run with success.

I do not think you will have the same problem as I had but perhaps it could give you some ideas about what is going on in your set up.