2
votes

I am trying to execute my spark scala application on an AWS EMR cluster, by creating a step Application Spark.

My Cluster contains 4 m3.xlarge

I start my application using this command :

spark-submit --deploy-mode cluster --class Main s3://mybucket/myjar_2.11-0.1.jar s3n://oc-mybucket/folder arg1 arg2

My application takes 3 parameters, the first one is a folder.

Unfortunatly after starting the application I see that only one Executor (+the master) who are active and I have 3 Executors dead, so all tasks are working only on the first one. see image

enter image description here

I tried many ways to activate thoses excutor but without any result ( "spark.default.parallelism, ""spark.executor.instances" and "spark.executor.cores"). What should I do so all the executor would be active and processing data ?

Also, when looking at Ganglia i have cpu always under 35%, is there a way so the cpu would be wokring more than 75% ?

thank you

UPDTAE

this is the stderr content of dead executors

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/14/__spark_libs__3671437061469038073.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/08/15 23:28:56 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14765@ip-172-31-39-255
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for TERM
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for HUP
20/08/15 23:28:56 INFO SignalUtils: Registered signal handler for INT
20/08/15 23:28:57 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:57 INFO SecurityManager: Changing view acls groups to: 
20/08/15 23:28:57 INFO SecurityManager: Changing modify acls groups to: 
20/08/15 23:28:57 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 186 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO SecurityManager: Changing view acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls to: yarn,hadoop
20/08/15 23:28:58 INFO SecurityManager: Changing view acls groups to: 
20/08/15 23:28:58 INFO SecurityManager: Changing modify acls groups to: 
20/08/15 23:28:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
20/08/15 23:28:58 INFO TransportClientFactory: Successfully created connection to ip-172-31-36-83.eu-west-1.compute.internal/172.31.36.83:37115 after 2 ms (0 ms spent in bootstraps)
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-d0d258ba-4345-45d1-9279-f6a97b63f81c
20/08/15 23:28:58 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1597532473783_0002/blockmgr-e7ae1e29-85fa-4df9-acf1-f9923f0664bc
20/08/15 23:28:58 INFO MemoryStore: MemoryStore started with capacity 2.6 GB
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@ip-172-31-36-83.eu-west-1.compute.internal:37115
20/08/15 23:28:59 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
20/08/15 23:28:59 INFO Executor: Starting executor ID 3 on host ip-172-31-39-255.eu-west-1.compute.internal
20/08/15 23:28:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40501.
20/08/15 23:28:59 INFO NettyBlockTransferService: Server created on ip-172-31-39-255.eu-west-1.compute.internal:40501
20/08/15 23:28:59 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/15 23:29:00 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:00 INFO BlockManager: external shuffle service port = 7337
20/08/15 23:29:00 INFO BlockManager: Registering executor with local external shuffle service.
20/08/15 23:29:00 INFO TransportClientFactory: Successfully created connection to ip-172-31-39-255.eu-west-1.compute.internal/172.31.39.255:7337 after 20 ms (0 ms spent in bootstraps)
20/08/15 23:29:00 INFO BlockManager: Initialized BlockManager: BlockManagerId(3, ip-172-31-39-255.eu-west-1.compute.internal, 40501, None)
20/08/15 23:29:03 INFO CoarseGrainedExecutorBackend: eagerFSInit: Eagerly initialized FileSystem at s3://does/not/exist in 3363 ms
20/08/15 23:30:02 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/08/15 23:30:02 INFO DiskBlockManager: Shutdown hook called
20/08/15 23:30:02 INFO ShutdownHookManager: Shutdown hook called

is this problem have to be with memory ?

2
You should look at the stderr of dead executors, and see if there are any errors. Then, maybe post a stacktrace here. - mazaneicha
@mazaneicha after checking all thoses dead executors i found that they contains the same error, content updated - Issibra
Not sure what Eagerly initialized FileSystem at s3://does/not/exist means, but clearly your executors were commanded to shut down after 1 minute, a default idle timeout. - mazaneicha

2 Answers

1
votes

You dont use all executors by default by spark-submit, you can specify the number of executors --num-executors, executor-core and executor-memory.

For instance, to increase the executors(which by default are 2)

spark-submit --num-executors N   #where N is desired number of executors like 5,10,50

See example in docs here

If it doesnt help or overides with spark-submit, you can override spark.executor.instances in conf/spark-defaults.conf file or similar so you don't have to specify it explicitly on the command line

For CPU utilization, you should look into executor-core and executor-core and either change them in spark-submit or conf. Increasing cpu cores will increase the usage hopefully.

Update:

As pointed by @Lamanus and I double checked, emr greater than 4.4 have spark.dynamicAllocation.enabled set to true, I suggest you to double check the partitions of your data, since having Dynamic allocation enabled the number of executor instances depends on by number of partitions, which vary according to stage in DAG execution. Also, with dynamic allocation, you can try out spark.dynamicAllocation.initialExecutors , spark.dynamicAllocation.maxExecutors , spark.dynamicAllocation.maxExecutors to control executors.

0
votes

This maybe a bit late but I found this AWS Big Data blog insightful to ensure that most of my cluster is utilised and that I'm able to achieve as much parallelism as possible.

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

More specifically:

Number of executors per instance = (total number of virtual cores per instance - 1)/ spark.executors.cores

Total executor memory = total RAM per instance / number of executors per instance

You can then control the number of parallel tasks during stages using spark.default.parallelism or repartitioning.