11
votes

I've been using org.apache.spark.ml.Pipeline for machine learning tasks. It is particularly important to know the actual probabilities instead of just a predicted label , and I am having difficulties to get it. Here I am doing a binary classification task with random forest. The class labels are "Yes" and "No". I would like to output probability for label "Yes" . The probabilities are stored in a DenseVector as the pipeline output, such as [0.69, 0.31], but I don't know which one is corresponding to "Yes" (0.69 or 0.31?). I guess there should be someway to retrieve it from labelIndexer?

Here is my task Code for training the model

val sc = new SparkContext(new SparkConf().setAppName(" ML").setMaster("local"))
val data = .... // load data from file
val df = sqlContext.createDataFrame(data).toDF("label", "features")
val labelIndexer = new StringIndexer()
                      .setInputCol("label")
                      .setOutputCol("indexedLabel")
                      .fit(df)

val featureIndexer = new VectorIndexer()
                        .setInputCol("features")
                        .setOutputCol("indexedFeatures")
                        .setMaxCategories(2)
                        .fit(df)


// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))


// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)
  .setFeatureSubsetStrategy("auto")
  .setImpurity("gini")
  .setMaxDepth(4)
  .setMaxBins(32)

// Create pipeline
val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf,labelConverter))

// Train model
val model = pipeline.fit(trainingData)

// Save model
sc.parallelize(Seq(model), 1).saveAsObjectFile("/my/path/pipeline")

Then I will load the pipeline and make predictions on new data, and here is the code piece

// Ignoring loading data part

// Create DF
val testdf = sqlContext.createDataFrame(testData).toDF("features", "line")
// Load pipeline
val model = sc.objectFile[org.apache.spark.ml.PipelineModel]("/my/path/pipeline").first

// My Question comes here : How to extract the probability that corresponding to class label "1"
// This is my attempt, I would like to output probability for label "Yes" and predicted label . The probabilities are stored in a denseVector, but I don't know which one is corresponding to "Yes". Something like this:
val predictions = model.transform(testdf).select("probability").map(e=>   e.asInstanceOf[DenseVector])

References regarding to the probabilities and labels for RF: http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forests

2
What do you mean by this " I would like to output probability for label "1" and predicted label . The probabilities are stored in a DenseVector as the pipeline output, but I don't know which one is corresponding to "1". " ?eliasah
Hi I've updated the description. Basically I want to output the probability that corresponding to the label "Yes".Qing
@Qing How did you solve this problem?Meethu Mathew
@Qing: did you find your answer? We have a vector of 2 probailities. Which porbability correspond to which class of the label. Which probability correpsond to "yes" and which probability correspond to "no"...?????Anneso

2 Answers

0
votes

do you mean that you wanna extract probability of positive label in the DenseVector? If so, you may create a udf function to solve the probability. In the DenseVector of binary classification, the first col presents the probability of "0" and the second col presents of "1".

val prediction = pipelineModel.transform(result)
val pre = prediction.select(getOne($"probability")).withColumnRenamed("UDF(probability)","probability")
0
votes

You're on the right track with retrieving it from label indexer.

See comments in the code for more information.

This example works with Scala 2.11.8 and Spark 2.2.1.

import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Column, SparkSession}

object Example {

  case class Record(features: org.apache.spark.ml.linalg.Vector)

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName("Example")
      .config(new SparkConf().setMaster("local[2]"))
      .getOrCreate

    val sc = spark.sparkContext

    import spark.implicits._

    val data = sc.parallelize(
      Array(
        (Vectors.dense(0.9, 0.6), "n"),
        (Vectors.dense(0.1, 0.1), "y"),
        (Vectors.dense(0.2, 0.15), "y"),
        (Vectors.dense(0.8, 0.9), "n"),
        (Vectors.dense(0.3, 0.4), "y"),
        (Vectors.dense(0.5, 0.5), "n"),
        (Vectors.dense(0.6, 0.7), "n"),
        (Vectors.dense(0.3, 0.3), "y"),
        (Vectors.dense(0.3, 0.3), "y"),
        (Vectors.dense(-0.5, -0.1), "dunno"),
        (Vectors.dense(-0.9, -0.6), "dunno")
      )).toDF("features", "label")

    // NOTE: you're fitting StringIndexer to all your data.
    // The StringIndexer orders the labels by label frequency.
    // In this example there are 5 "y" labels, 4 "n" labels
    // and 2 "dunno" labels, so the probability columns will be
    // listed in the following order: "y", "n", "dunno".
    // You can play with label frequencies to convince yourself
    // that it sorts labels by frequency in provided data.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("label_indexed")
      .fit(data)

    val indexToLabel = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predicted_label")
      .setLabels(labelIndexer.labels)

    // Here I use logistic regression, but the exact algorithm doesn't
    // matter in this case.
    val lr = new LogisticRegression()
      .setFeaturesCol("features")
      .setLabelCol("label_indexed")
      .setPredictionCol("prediction")

    val pipeline = new Pipeline().setStages(Array(
      labelIndexer,
      lr,
      indexToLabel
    ))

    val model = pipeline.fit(data)

    // Prepare test set
    val toPredictDf = sc.parallelize(Array(
      Record(Vectors.dense(0.1, 0.5)),
      Record(Vectors.dense(0.8, 0.8)),
      Record(Vectors.dense(-0.2, -0.5))
    )).toDF("features")

    // Make predictions
    val results = model.transform(toPredictDf)

    // The column containing probabilities has to be converted from Vector to Array
    val vecToArray = udf( (xs: org.apache.spark.ml.linalg.Vector) => xs.toArray )
    val dfArr = results.withColumn("probabilityArr" , vecToArray($"probability") )

    // labelIndexer.labels contains the list of your labels.
    // It is zipped with index to match the label name with
    // related probability found in probabilities array.
    // In other words:
    // label labelIndexer.labels.apply(idx)
    // matches:
    // col("probabilityArr").getItem(idx)
    // See also: https://stackoverflow.com/a/49917851
    val probColumns = labelIndexer.labels.zipWithIndex.map {
      case (alias, idx) => (alias, col("probabilityArr").getItem(idx).as(alias))
    }

    // 'probColumns' is of type Array[(String, Column)] so now 
    // concatenate these Column objects to DataFrame containing predictions
    // See also: https://stackoverflow.com/a/43494322
    val columnsAdded = probColumns.foldLeft(dfArr) { case (d, (colName, colContents)) =>
      if (d.columns.contains(colName)) {
        d
      } else {
        d.withColumn(colName, colContents)
      }
    }

    columnsAdded.show()
  }
}

Once you run this code, it will produce the following data frame:

+-----------+---------------+--------------------+--------------------+--------------------+
|   features|predicted_label|                   y|                   n|               dunno|
+-----------+---------------+--------------------+--------------------+--------------------+
|  [0.1,0.5]|              y|  0.9999999999994298|5.702468131669394...|9.56953780171369E-19|
|  [0.8,0.8]|              n|5.850695258713685...|                 1.0|4.13416875406573E-81|
|[-0.2,-0.5]|          dunno|1.207908506571593...|8.157018363627128...|  0.9998792091493428|
+-----------+---------------+--------------------+--------------------+--------------------+

Columns y, n and dunno are the columns that we have just added to the ordinary output of Spark's ML pipeline.