3
votes

We are trying to implement a simple spark job which reads a CSV file (1 row of data) and makes prediction using prebuilt random forest model object. This job doesn’t include any data preprocessing or data manipulation.

We are running spark in a standalone mode with the application running locally. The configuration is as follows: RAM: 8GB Memory: 40GB No. of cores: 2 Spark version: 1.5.2 Scala version: 2.10.5 Input file size: 1KB (1 row of data) Model file size: 1,595 KB (400 trees random forest)

Currently, the implementation in spark-submit takes about 13 seconds. However, the run time is a huge concern for this application hence

  1. Is there a way to optimize the code to bring the run time down to 1 or 2 seconds? (high priority)

  2. We noticed that the actual code’s execution takes about 7-8 seconds while boot up and setting contexts take about 5-6 seconds, so is there a way to keep the spark context running while we run the spark-submit.

Here is the application code

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object RF_model_App {
  def main(args: Array[String]) {

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature4.{RandomForestfeature4Model, RandomForestClassifier}
import org.apache.spark.ml.evaluation.Multiclassfeature4Evaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._
val Test = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("/home/ubuntu/Test.csv")
Test.registerTempTable("Test")
val model_L1 = sc.objectFile[RandomForestfeature4Model]("/home/ubuntu/RF_L1.model").first()

val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val featureDf = Test.withColumn("id1", toInt(Test("id1")))  .withColumn("id2", toInt(Test("id2")))  .withColumn("id3", toInt(Test("id3")))  .withColumn("id4", toInt(Test("id4")))   .withColumn("feature3", toInt(Test("feature3")))   .withColumn("feature9", toInt(Test("feature9")))    .withColumn("feature10", toInt(Test("feature10")))  .withColumn("feature12", toInt(Test("feature12")))  .withColumn("feature14", toDouble(Test("feature14")))   .withColumn("feature15", toDouble(Test("feature15")))   .withColumn("feature16", toInt(Test("feature16")))  .withColumn("feature17", toDouble(Test("feature17")))   .withColumn("feature18", toInt(Test("feature18")))

val feature4_index = new StringIndexer()  .setInputCol("feature4")  .setOutputCol("feature4_index")
val feature6_index = new StringIndexer()  .setInputCol("feature6")  .setOutputCol("feature6_index")
val feature11_index = new StringIndexer()  .setInputCol("feature11")  .setOutputCol("feature11_index")
val feature8_index = new StringIndexer()  .setInputCol("feature8")  .setOutputCol("feature8_index")
val feature13_index = new StringIndexer()  .setInputCol("feature13")  .setOutputCol("feature13_index")
val feature2_index = new StringIndexer()  .setInputCol("feature2")  .setOutputCol("feature2_index")
val feature5_index = new StringIndexer()  .setInputCol("feature5")  .setOutputCol("feature5_index")
val feature7_index = new StringIndexer()  .setInputCol("feature7")  .setOutputCol("feature7_index")
val vectorizer_L1 =  new VectorAssembler()  .setInputCols(Array("feature3",  "feature2_index", "feature6_index", "feature4_index", "feature8_index", "feature7_index", "feature5_index", "feature10", "feature9", "feature12", "feature11_index", "feature13_index", "feature14", "feature15", "feature18", "feature17", "feature16")).setOutputCol("features_L1")
val feature_pipeline_L1 = new Pipeline()  .setStages(Array( feature4_index, feature6_index, feature11_index,feature8_index, feature13_index,  feature2_index, feature5_index, feature7_index,vectorizer_L1))
val testPredict= feature_pipeline_L1.fit(featureDf).transform(featureDf)
val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1))
val getid2 = udf((v: Int) => v)
val L1_output = model_L1.transform(testPredict).select(getid2($"id2") as "id2",getid2($"prediction") as "L1_prediction",getPOne($"probability") as "probability")

L1_output.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").mode("overwrite").save("/home/L1_output")

  }
};
1

1 Answers

1
votes

Lets start with things which are simply wrong:

  • feature mechanism you use is just incorrect. StringIndexer assigns indices based on the distribution of data so the same record will have different encoding depending on the other records. You should use the same StringIndexerModel (-s) for training, testing and predictions.
  • val getid2 = udf((v: Int) => v) is just an expensive identity.

Persistent SparkContext

There are multiple tools which keep persistent context including job-server or Livy.

Finally you can simply use Spark Streaming and just process the data as it comes.

Shuffling

You are also using repartition to create a single, thus I suppose a one file CSV. This action is quite expensive, but in definition, it reshuffles the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

Other considerations:

If latency is important and you use only a single, low performance machine, don't use Spark at all. There is nothing to gain here. A good local library can do much better job in case like this.

Notes:

We don't access to your data or your hardware so any requirements like reduce time to 7s are completely meaningless.