3
votes

I am using Spark 1.5.1 with MLLib. I built a random forest model using MLLib, now use the model to do prediction. I can find the predict category (0.0 or 1.0) using the .predict function. However, I can't find the function to retrieve the probability (see the attached screenshot). I thought spark 1.5.1 random forest would provide the probability, am I missing anything here?

enter image description here

2

2 Answers

6
votes

Unfortunately the feature is not available in the older Spark MLlib 1.5.1.

You can however find it in the recent Pipeline API in Spark MLlib 2.x as RandomForestClassifier:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file, converting it to a DataFrame.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel").fit(data)

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4).fit(data)

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol(labelIndexer.getOutputCol)
  .setFeaturesCol(featureIndexer.getOutputCol)
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and forest in a Pipeline
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

// Fit model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)
// predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector, indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string]

predictions.show(10)
// +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
// |label|            features|indexedLabel|     indexedFeatures|rawPrediction|probability|prediction|predictedLabel|
// +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
// |  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
// |  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
// |  0.0|(692,[129,130,131...|         1.0|(692,[129,130,131...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
// |  0.0|(692,[154,155,156...|         1.0|(692,[154,155,156...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
// |  0.0|(692,[154,155,156...|         1.0|(692,[154,155,156...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
// |  0.0|(692,[181,182,183...|         1.0|(692,[181,182,183...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
// |  1.0|(692,[99,100,101,...|         0.0|(692,[99,100,101,...|    [4.0,6.0]|  [0.4,0.6]|       1.0|           0.0|
// |  1.0|(692,[123,124,125...|         0.0|(692,[123,124,125...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
// |  1.0|(692,[124,125,126...|         0.0|(692,[124,125,126...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
// |  1.0|(692,[125,126,127...|         0.0|(692,[125,126,127...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
// +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
// only showing top 10 rows

Note: This example is from the official documentation of Spark MLlib's ML - Random forest classifier.

And here is some explanation on some output columns :

  • predictionCol represents the predicted label .
  • rawPredictionCol represents a Vector of length # classes, with the counts of training instance labels at the tree node which makes the prediction (available for Classification only).
  • probabilityCol represents the probability Vector of length # classes equal to rawPrediction normalized to a multinomial distribution (available with Classification only).
4
votes

You can't directly get the classification probabilities but it is relatively easy to calculate it yourself. RandomForest is an ensemble of trees and its output probability is the majority vote of these trees divided by the total number of trees.

Since the RandomForestModel in MLib gives you the trained trees it is easy to do it yourself. The following code gives the probability for the binary classification problem. Its generalization to multi-class classification is straightforward.

  def predict(points: RDD[LabeledPoint], model: RandomForestModel) = {
    val numTrees = model.trees.length
    val trees = points.sparkContext.broadcast(model.trees)
    points.map { point =>
    trees.value
    .map(_.predict(point.features))
    .sum / numTrees
  }

}

for multi-class case you only need to replace map with .map(_.predict(point.features)-> 1.0) and group by key instead of sum and finally take the max of values.