0
votes

Recently I have to prepare some lab material for students to learn machine learning using Spark/MLlib/Scala. I am familiar with machine learning but new to Spark.

One "textbook" trick of machine learning is to add higher degree terms of original features to allow non-linear model. Let say, after I load the training data from a LIBSVM file, I want to add the square of all features in addition to the original ones. My current limited knowledge yields the below implementation:

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val data_add_square = data.map(s => {
  val tmp = sc.parallelize(s.features.toArray)
  val tmp_sqr = tmp.map(x => x*x)
  new LabeledPoint(s.label, Vectors.dense(s.features.toArray ++ tmp_sqr.collect))
})

Somehow I feel this implementation is too "heavyweight" and looks not the right way to do that. Can anyone shed some light on this issue?

1

1 Answers

0
votes

It's define overkill to parallelize each feature vector, it will be super slow to create RDD for each LabeledPoint, you'll just kill your Spark cluster.

Also I suggest to work with sparse and dense vectors separately. If you'll have sparse vector originally I strongly suggest to build sparse vector for squared features as well.

Code looks a bit heavy, even more heavier that yours, but it should work better in case of huge feature vectors

  val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
  val dataWithSquares = data map { labeledPoint =>

    val featuresWithSquared = labeledPoint.features match {
      case denseVector: DenseVector =>
        val original = denseVector.values
        val squared = original.map(v => v * v)
        Vectors.dense(original ++ squared)

      case sparseVector: SparseVector =>
        val builder = collection.mutable.ListBuffer.empty[(Int, Double)]
        // Collect original values firsts
        sparseVector foreachActive { case (idx, v) => builder += idx -> v}
        // Collect squared
        sparseVector foreachActive { case (idx, v) => builder += idx -> v * v}
        Vectors.sparse(sparseVector.size * 2, builder)
    }

    LabeledPoint(labeledPoint.label, featuresWithSquared)
  }