I am using Spark MLLib to perform K-means clustering on AWS EMR. The data set is on the order of 10^6 rows with 9 feature columns. The size of instance I am using has 8vCPU and 32GB of memory.
I expected that as I increase the number of nodes on the cluster, that I would have increased performance(decreased execution time) from Spark, however I am getting the opposite results.
I get WORSE performance(higher execution time) with MORE worker nodes/instances than I do with a single worker node. I have had the same results with clusters of 5, 10, and 15 worker nodes; as the number of nodes increase, there is a decrease in performance. I have attempted to vary the partitions (spark.sql.shuffle.partitions) and used various configurations of executor cores, number of executors, and executor memory.
My code is below (number of executors is for 10 worker nodes):
spark-shell --executor-cores 3 num-executors 20 --executor-memory 10G
import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.sql.{SparkSession, SQLContext, DataFrame}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.linalg.{Vector, Vectors, DenseVector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector}
sc.stop()
val configuration = new Configuration()
val sc = new SparkContext("local", "phoenix-load")
val sqlContext = new SQLContext(sc)
//dataset is loaded in from Phoenix table and set as featureDf6
//dataset is made up of all numerical values (DOUBLE)
val columns = Array("DUR","AVG_AMP","AVG_POW","PAPR","SNR","SNR_DB","BW_3DB","BW_10DB","BWNE")
val assembler = new VectorAssembler().setInputCols(columns).setOutputCol("featuresin")
val df = assembler.transform(featureDf6)
val scaler = new MinMaxScaler().setInputCol("featuresin").setOutputCol("features").setMin(-1).setMax(1)
val scalerModel = scaler.fit(df)
val scaledData = scalerModel.transform(df)
val kmeans = new KMeans().setK(14).setSeed(1L).setMaxIter(1000)
val model = kmeans.fit(scaledData)