I am not able to configure YARN and Spark to utilize all the resources on my Dataproc Spark cluster on GCP.
I am running a 1 master (4 cores) and 2 workers (16 cores) cluster, and I want my Spark application to use 30 cores out of the 32 cores available on the worker instances. But when I look at the YARN UI it says that only 5 cores are being used. When I look at Spark Executors UI, it says 20 cores are being used. When I look at the CPU activity on the workers, there is hardly any activity.
I am utterly confused. Please help.
The command to create the Dataproc cluster:
gcloud dataproc clusters create default-cluster \
--region europe-west1 --subnet default --zone europe-west1-d \
--master-machine-type n1-standard-4 --master-boot-disk-size 500 \
--num-workers 2 --worker-machine-type n1-standard-16 --worker-boot-disk-size 500 \
--image-version 1.4-debian9 \
--project product-normalyzr
The command to submit the job:
gcloud dataproc jobs submit spark --cluster=default-cluster \
--region=europe-west1 \
--properties=spark.executor.instances=6,spark.executor.cores=5,spark.executor.memory=14g \
--jars=dist/yzr-core-scala_2.11-0.1.jar,dist/deps/gson-2.8.6.jar,dist/deps/picocli-4.2.0.jar \
--class=solutions.yzr.pnormalyzr.similarity.Main
The way I am creating the Spark Context:
def buildSession(appName: String): SparkSession = {
SparkSession
.builder()
.appName(appName)
.getOrCreate()
}
In case the problem could be linked to the spark logic (maybe partitioning or something) I am also providing the major part of the spark app code. I doubt that this could be the reason because when I run this locally on my machine I see the CPU usage completely explode, and that is what I am expecting to see on the worker nodes.
println("Load features")
val features = session.sparkContext.textFile(inputPath)
.map((rawText: String) => {
new Gson().fromJson(rawText, classOf[DocFeatures])
})
features.take(5).foreach(println)
println("Compute Scores")
val scores = features.cartesian(features)
// compute similarity
.map((d: (DocFeatures, DocFeatures)) => {
val docA = d._1
val docB = d._2
val (score, explain) = SimilarityMetric.score(docA, docB)
SimilarityScore(
pA = docA.slug,
pB = docB.slug,
score = score,
explain = explain)
})
// filter items with no similarity
.filter(s => s.score > 0)
scores.take(5).foreach(println)
println("Export")
// store to disk
val scoreStrings = scores.map(d => {
new Gson().toJson(d)
})
scoreStrings.take(5).foreach(println)
scoreStrings.saveAsTextFile(outputPath)
session.close()
println("End")
On the Yarn UI it only says that 5 vcores are allocated, whereas I wanted to allocate 6 instances with 5 cores each, so 30 cores in total.
On the Spark Job UI it says that only 4 executors were added whereas I wanted 6 executor instances.
On the Spark Executors UI it says that the 4 executors are allocated 5 cores each, which corresponds to my setting, but when I look at the CPU usage on the workers there is absolutely no activity there.
Htop shows no CPU activity on the worker nodes.
I feel I am confusing all the different variables YARN and Spark. Any help will be truly appreciated.