1
votes

I've got a 4 node Spark Standalone cluster with a spark streaming job running on it.

When I submit the job with 7 cores per executor everything runs smoothly:

spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 7 --total-executor-cores 28 /path/to/jar/spark-job.jar

When I increase to 24 cores per executor none of the batches get processed and I see java.lang.OutOfMemoryError: unable to create new native thread in the executor logs. The executors then keep failing:

spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 24 --total-executor-cores 96 /path/to/jar/spark-job.jar

Error:

17/01/12 16:01:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Shutdown-checker,5,main]
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:714)
        at io.netty.util.concurrent.SingleThreadEventExecutor.shutdownGracefully(SingleThreadEventExecutor.java:534)
        at io.netty.util.concurrent.MultithreadEventExecutorGroup.shutdownGracefully(MultithreadEventExecutorGroup.java:146)
        at io.netty.util.concurrent.AbstractEventExecutorGroup.shutdownGracefully(AbstractEventExecutorGroup.java:69)
        at com.datastax.driver.core.NettyOptions.onClusterClose(NettyOptions.java:190)
        at com.datastax.driver.core.Connection$Factory.shutdown(Connection.java:844)
        at com.datastax.driver.core.Cluster$Manager$ClusterCloseFuture$1.run(Cluster.java:2488)

I found this question and tried upping the ulimits substantially but it had no effect.

Each box has 32 cores and 61.8 GB memory. The streaming job is written in java and running on Spark 2.0.0 connecting to Cassandra 3.7.0 with the spark-cassandra-connector-java_2.10 1.5.0-M2.

The data is a very small trickle of less than 100 events per second each of which are less than 200B.

1
What is the size of data ? Could you be able to check total ram per node and number of cores per node in your cluster ? - Sandeep Singh
So problem is you are assigning 24 executor core/ machine and 30 GB memory per executor . so total memory you are assigning 720GB. Your cluster having only 247GB memory, therefore you are getting out of memory exception :) - Sandeep Singh
@sandeep-singh There is only one executor per worker. Those are per executor limits. - Kevin
Yeah.. your data is small so you shouldnot assign more executor but here your are running streaming job so you can increase but assigning more core would be more beneficial. - Sandeep Singh
You can also measure time by increasing decreasing executor and judge best performance. - Sandeep Singh

1 Answers

0
votes

Sounds like you are running Out of Memory ;).

For a little more detail, the number of cores in use by Spark is directly tied to the amount of information being worked on in parallel. You can basically think about each Core as working on a full Spark Partition's data and can potentially require the full thing to reside in memory.

7 Cores per executor means 7 Spark Partitions are being worked on simultaneously. Bumping this number up to 24 means roughly 4 times as much ram will be in use. This could easily cause an OOM in various places.

There are a few ways to deal with this.

  1. Allocate more memory to the Executor JVMs
  2. Shrink the size of the Spark Partitions (Smaller partitions means less data in memory at any given time)
  3. Make sure you aren't caching any RDDs in Memory (and thus exhausting the system resources)
  4. Reduce the amount of data you are working with, take subsets or try to filter at the server before hitting spark.