2
votes

The first stage of my spark job is quite simple.

  1. It reads from a big number of files (around 30,000 files and 100GB in total) -> RDD[String]
  2. does a map (to parse each line) -> RDD[Map[String,Any]]
  3. filters -> RDD[Map[String,Any]]
  4. coalesces (.coalesce(100, true))

When running it, I observe a quite peculiar behavior. The number of executors grows until the given limit I specified in spark.dynamicAllocation.maxExecutors (typically 100 or 200 in my application). Then it starts decreasing quickly (at approx. 14000/33428 tasks) and only a few executors remain. They are killed by the drive. When this task is done. The number of executors increases back to its maximum value.

Below is a screenshot of the number of executors at its lowest.

enter image description here

An here is a screenshot of the task summary.

enter image description here

I guess that these executors are killed because they are idle. But, in this case, I do not understand why would they become idle. There remains a lot of task to do in the stage...

Do you have any idea of why it happens?

EDIT

More details about the driver logs when an executor is killed:

16/09/30 12:23:33 INFO cluster.YarnClusterSchedulerBackend: Disabling executor 91.
16/09/30 12:23:33 INFO scheduler.DAGScheduler: Executor lost: 91 (epoch 0)
16/09/30 12:23:33 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 91 from BlockManagerMaster.
16/09/30 12:23:33 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(91, server.com, 40923)
16/09/30 12:23:33 INFO storage.BlockManagerMaster: Removed 91 successfully in removeExecutor
16/09/30 12:23:33 INFO cluster.YarnClusterScheduler: Executor 91 on server.com killed by driver.
16/09/30 12:23:33 INFO spark.ExecutorAllocationManager: Existing executor 91 has been removed (new total is 94)

Logs on the executor

16/09/30 12:26:28 INFO rdd.HadoopRDD: Input split: hdfs://...
16/09/30 12:26:32 INFO executor.Executor: Finished task 38219.0 in stage 0.0 (TID 26519). 2312 bytes result sent to driver
16/09/30 12:27:33 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
16/09/30 12:27:33 INFO storage.DiskBlockManager: Shutdown hook called
16/09/30 12:27:33 INFO util.ShutdownHookManager: Shutdown hook called
1
Do you have more logs? On the machines itself? Browse around in the web UI some.Reactormonk
I have added drive and executor logsPop
"16/09/30 12:27:33 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM" - find out why. OOM killer? Something else? Check dmesg and similar.Reactormonk
No message talking about memory... And in the end, the job finishes successfully, therefore I guess there is no OOM otherwise the job would fail, wouldn't it? The logs I have put above are really the only ones that are not as usualPop
Without reproduction, I can't tell you much more than that. :-(Reactormonk

1 Answers

0
votes

I'm seeing this problem on executors that are killed as a result of an idle timeout. I have an exceedingly demanding computational load, but it's mostly computed in a UDF, invisible to Spark. I believe that there's some spark parameter that can be adjusted.

Try looking through the spark.executor parameters in https://spark.apache.org/docs/latest/configuration.html#spark-properties and see if anything jumps out.