2
votes

I have total of 6 nodes in my spark cluster. 5 nodes had each 4 core and 32GB ram, and one of the nodes(node 4) had 8 cores and 32GB ram.

So i have total of 6 nodes - 28 cores, 192GB RAM.( i want to use half of the memory, but all cores)

Planning to run 5 spark applications on the cluster.

My spark_defaults.conf as below:

spark.master                     spark://***:7077
spark.eventLog.enabled           false
spark.driver.memory              2g
worker_max_heapsize              2g
spark.kryoserializer.buffer.max.mb      128
spark.shuffle.file.buffer.kb    1024
spark.cores.max                 4
spark.dynamicAllocation.enabled true

I want to use 16GB max on each node and 4 worker instances run on each machine by setting the below configuration. So, i am expected (4 instances * 6 nodes =24) workers on my cluster. All together they would use up to 28 cores (all) and 96GB ram.

my spark-env.sh is as below.

export SPARK_WORKER_MEMORY=16g
export SPARK_WORKER_INSTANCES=4
SPARK_LOCAL_DIRS=/app/spark/spark-1.6.1-bin-hadoop2.6/local
SPARK_WORKER_DIR=/app/spark/spark-1.6.1-bin-hadoop2.6/work

But my spark cluster has started

Spark UI is showing the workers that were running..

Worker Id ? Address State   Cores   Memory
worker-node4-address    ALIVE   8 (1 Used)  16.0 GB (0.0 GB Used)
worker-node4-address    ALIVE   8 (1 Used)  16.0 GB (0.0 GB Used)
worker-node4-address    ALIVE   8 (1 Used)  16.0 GB (0.0 GB Used)
worker-node4-address    ALIVE   8 (0 Used)  16.0 GB (0.0 B Used)
worker-node4-address    ALIVE   8 (1 Used)  16.0 GB (0.0 GB Used)
worker-node1-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node1-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node1-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node1-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)

worker-node2-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node2-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node2-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node2-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)

worker-node3-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node3-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node3-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node3-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)

worker-node5-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node5-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node5-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node5-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)

worker-node6-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node6-address    ALIVE   4 (3 Used)  16.0 GB (0.0 GB Used)
worker-node6-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)
worker-node6-address    ALIVE   4 (0 Used)  16.0 GB (0.0 B Used)

But the master UI is showing (when no applications running) Alive Workers: 25 Cores in use: 120 Total, 0 Used Memory in use: 400.0 GB Total, 0 GB Used Status: ALIVE

When i am expecting 24 workers(4 per node), why there are 25? - 1 is extra on node4 which has 8 cores.

When i assigned max of 16GB to use on each nodes, why is it showing Memory in use: 400.0 GB Total?

UI data is showing i had 120 cores where as i had 28 cores on my cluster?

Could you please tell me what kind of spark configuration my system should have.?

How many cores executor memory should i specify when i submit spark job?

what is spark.cores.max parameter? Is it per node or total cluster?

I ran 3 application with spart-submit configuration as --executor-memory 2G --total-executor-cores 4 At lease one of my applications is giving below error and failing.

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:714)
        at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
        at scala.concurrent.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
        at scala.concurrent.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
        at scala.concurrent.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
        at scala.concurrent.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
        at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:120)
        at scala.concurrent.impl.Future$.apply(Future.scala:31)
        at scala.concurrent.Future$.apply(Future.scala:485)
        at org.apache.spark.deploy.rest.RestSubmissionClient.readResponse(RestSubmissionClient.scala:232)
        at org.apache.spark.deploy.rest.RestSubmissionClient.org$apache$spark$deploy$rest$RestSubmissionClient$$postJson(RestSubmissionClient.scala:222)
        at org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:87)
        at org.apache.spark.deploy.rest.RestSubmissionClient$$anonfun$createSubmission$3.apply(RestSubmissionClient.scala:83)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.deploy.rest.RestSubmissionClient.createSubmission(RestSubmissionClient.scala:83)
        at org.apache.spark.deploy.rest.RestSubmissionClient$.run(RestSubmissionClient.scala:411)
        at org.apache.spark.deploy.rest.RestSubmissionClient$.main(RestSubmissionClient.scala:424)
        at org.apache.spark.deploy.rest.RestSubmissionClient.main(RestSubmissionClient.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
1

1 Answers

1
votes

As far as I know you should only start one Worker per Node:

http://spark.apache.org/docs/latest/hardware-provisioning.html

Only if you have more than 200 GB-Ram per Node. But you don't have 200 GB-Ram per Node. Could you set this in spark-env.sh at the Nodes which have only 4 Cores?

export SPARK_EXECUTOR_CORES=4
export SPARK_EXECUTOR_MEMORY=16GB
export SPARK_MASTER_HOST=<Your Master-Ip here>

And at this at the Node which has 8 Cores:

export SPARK_EXECUTOR_CORES=8
export SPARK_EXECUTOR_MEMORY=16GB
export SPARK_MASTER_HOST=<Your Master-Ip here>

And this at the Master Node in spark-defaults.conf:

spark.driver.memory              2g

I think you should try this and comment out the other Konfigurations for a test. Is this then what you want? Your Cluster should now use 96 GB and 28 Cores in total. You can start your Application then without --executor-memory 2G --total-executor-cores 4. But an java.lang.OutOfMemoryErrorcan happen without wrong konfigurations. It also can happen when you collect too much to the driver.

And yes, every Worker has 16 GB Ram in your current Config. Then 25 Worker * 16 GB = 400 GB in total.