I'm trying to use EMR for crawling. The target server recognizes client IP, so I want to run one executor for each core node. Currently, I have one master node and two core nodes. The core nodes' type is c4.large which has two vcores each. So, I need to change settings. (Default setting would run two executors on one core node.)
Here is the configuration for my cluster.
[{"classification":"spark", "properties":{"maximizeResourceAllocation":"true"}, "configurations":[]},
{"classification":"yarn-site", "properties":{
"yarn.nodemanager.resource.cpu-vcores":"1",
"yarn.nodemanager.resource.memory-mb":"3584",
"yarn.scheduler.maximum-allocation-vcores":"1",
"yarn.scheduler.maximum-allocation-mb":"3584"}, "configurations":[]},
{"classification":"mapred-site", "properties":{
"mapreduce.map.memory.mb":"3584",
"mapreduce.map.cpu.vcores":"1"}, "configurations":[]}]
And here is the run script.
spark-submit \
--conf spark.hadoop.parquet.enable.dictionary=true \
--conf spark.hadoop.parquet.enable.summary-metadata=false \
--conf spark.sql.hive.metastorePartitionPruning=true \
--conf spark.sql.parquet.filterPushdown=true \
--conf spark.sql.parquet.mergeSchema=true \
--conf spark.worker.cleanup.enabled=true \
--conf spark.executorEnv.LD_LIBRARY_PATH="/usr/local/lib:$LD_LIBRARY_PATH" \
--conf spark.executor.cores=1 \
--conf spark.executor.memory=3200m \
--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 \
extract_spark.py news_new data
Lastly, here is the code snipet.
numbers = sc.parallelize(list(range(100)))
contents = numbers.flatMap(lambda n: get_contents(args.id, n)).toDF()
contents.coalesce(2).write.mode('append').parquet(
os.path.join(args.path, args.id))
It just uses one core-node. Two map tasks are executed in sequence on a core-node. Used core-node is selected randomly, so I guess both core-nodes are ready to be used.
How can I run two tasks on two core nodes in parallel?