2
votes

I am trying to run a process using the Futures API of scala to run certain actions in parallel. Below is a sample code snippet

import scala.util._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global

object ConcurrentContext {
  def def appMain(args: Array[String]) = {
// configure spark
val spark = SparkSession
    .builder
    .appName("parjobs")
    .getOrCreate()

val pool = Executors.newFixedThreadPool(5)
   // create the implicit ExecutionContext based on our thread pool
implicit val xc = ExecutionContext.fromExecutorService(pool)

 /** Wraps a code block in a Future and returns the future */
def executeAsync[T](f: => T): Future[T] = {
Future(f)
}
} 

My questions are:-

  1. If I set executor-cores value to 4 which controls the number of threads per executor JVM and create a thread pool of 5 inside the application, which one would take precedence?

  2. If I don't explicitly set the thread pool then the default ExecutionContext will create a default thread pool based on all the cores present on the machine from where the process is initiated (which would be the driver), in that situation how would the executor-core property impact?

  3. If the thread pool value takes precedence over executor-core and if I use the default value is there a possibility that there are many threads(equal to CPU cores) per JVM?

1
Keep in mind that future is eager... - Emiliano Martinez
The properties you set using SparkSession (including spark.executor.cores) only affects the parallelism of tasks Spark performs on a distributed data-structure (RDD, DataFrame, Dataset). Everything else you do, is on the driver machine itself and independent of Spark; and you can control its parallelism using ThreadPools etc. - suj1th
@EmiCareOfCell44 - can you please explain what you mean by "future is eager" ? - Jayadeep Jayaraman
Check this post iravid.com/posts/fp-and-spark.html , maybe it can be helpful - Emiliano Martinez
@suj1th - I think I got what you were saying the main driver JVM will spawn multiple threads based on the thread pool and each thread will have access to the same Spark Context to run mutiple Future calls in parallel. Is my understanding correct? - Jayadeep Jayaraman

1 Answers

2
votes

If I set executor-cores value to 4 which controls the number of threads per executor JVM and create a thread pool of 5 inside the application

When you execute a Spark application you have the driver and one or more executors. For the sake of simplicity, let's assume you have one executor only.

You have 4 CPUs for the executor.

How many tasks can you run in parallel with 4 CPUs? 4 exactly!

The driver runs within that part of the Spark application that has a thread pool of 5 threads. For the sake of simplicity, let's assume that all 5 are used.

How many Spark jobs can you schedule? 5 exactly!

Every Spark job can have one or more stages with one or more partitions to process using tasks. For the sake of simplicity, let's assume that all 5 Spark jobs have 1 stage with 1 partition (which is highly unlikely, but just to give you some idea how Spark works it should be fine).

Remember that 1 partition is exactly 1 task.

How many tasks will the Spark application submit? 5 jobs with 1 task each gives 5 tasks.

How much time does it take to execute all 5 tasks on 5-CPU executor? 1 time slot (whatever "time slot" could mean).

That's the relationship between executor cores/CPUs and the thread pool of 5 threads on the driver.

If I don't explicitly set the thread pool then the default ExecutionContext will create a default thread pool based on all the cores present on the machine from where the process is initiated (which would be the driver), in that situation how would the executor-core property impact?

I think the above part answers this question too.

If the thread pool value takes precedence over executor-core and if I use the default value is there a possibility that there are many threads(equal to CPU cores) per JVM?

So does it. Correct?