I have pySpark Job (spark 2.4.1) that seems to be working fine in 10% of the cases, and the other times seemingly stuck forever on a single task, where I can't really understand what is happening. Here's what I'm doing in my pyspark code:
df = ss.read.parquet(...)
df2 = df.withColumn("A", my_python_udf(sf.col("position.latitude"))
print(df2.groupBy(sf.spark_partition_id()).count().agg(sf.min("count"), sf.max("count"), sf.avg("count")).toPandas())
I seem to be forever stuck in the evaluation of the "toPandas" call. When I check the executors tab, only one executor is runnable with the following call stack:
java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:171) java.net.SocketInputStream.read(SocketInputStream.java:141) java.io.BufferedInputStream.read1(BufferedInputStream.java:284) java.io.BufferedInputStream.read(BufferedInputStream.java:345) => holding Monitor(java.io.BufferedInputStream@1118259716}) java.io.DataInputStream.readFully(DataInputStream.java:195) java.io.DataInputStream.readFully(DataInputStream.java:169) org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:74) org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) org.apache.spark.scheduler.Task.run(Task.scala:121) org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
I have several questions:
- Why does it seems like the call stack is doing stuff related to the UDF evaluatin, which is not needed for my computation?
- What is actually going on? I'm not sure if that thread is deadlocked or live from the call stack
- How to fix this?
edit : I also have 2 executors that are failing with the following error:
java.io.IOException: expected more bytes in input stream at net.razorvine.pickle.PickleUtils.readbytes_into(PickleUtils.java:75) at net.razorvine.pickle.PickleUtils.readbytes(PickleUtils.java:55) at net.razorvine.pickle.Unpickler.load_binunicode(Unpickler.java:473) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:190) at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:90) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$evaluate$1.apply(BatchEvalPythonExec.scala:89) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Which makes me believe that something outside of my code is going wrong