0
votes

I am using Spark MLLib to perform K-means clustering on AWS EMR. The data set is on the order of 10^6 rows with 9 feature columns. The size of instance I am using has 8vCPU and 32GB of memory.

I expected that as I increase the number of nodes on the cluster, that I would have increased performance(decreased execution time) from Spark, however I am getting the opposite results.

I get WORSE performance(higher execution time) with MORE worker nodes/instances than I do with a single worker node. I have had the same results with clusters of 5, 10, and 15 worker nodes; as the number of nodes increase, there is a decrease in performance. I have attempted to vary the partitions (spark.sql.shuffle.partitions) and used various configurations of executor cores, number of executors, and executor memory.

My code is below (number of executors is for 10 worker nodes):

spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G

import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._ 
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}

sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc) 

//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)

val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")    
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)

val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)

val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)
1
How big is your data? Your number of nodes/ executors should be in accordance with your data size else spark will be spending time just shuffling the data around, which involves network latencies too - Neha Jirafe
I have ran 70MB, 1.6GB, and 16GB datasets, with varying numbers of nodes (1,5,10,15) and scale the number of executors to 2 per node. - Christopher Ferris

1 Answers

0
votes

I found the cause of the issue was the method in which Spark was reading in the data from Phoenix/HBase. When I uploaded the dataset directly into Spark, the results were as expected and as the nodes increased, the execution time went down. I will post another question to identify the error in the process for reading from Phoenix.