5
votes

I have an RDD in Spark where the objects are based on a case class:

ExampleCaseClass(user: User, stuff: Stuff)

I want to use Spark's ML pipeline, so I convert this to a Spark data frame. As part of the pipeline, I want to transform one of the columns into a column whose entries are vectors. Since I want the length of that vector to vary with the model, it should be built into the pipeline as part of the feature transformation.

So I attempted to define a Transformer as follows:

class MyTransformer extends Transformer {

  val uid = ""
  val num: IntParam = new IntParam(this, "", "")

  def setNum(value: Int): this.type = set(num, value)
  setDefault(num -> 50)

  def transform(df: DataFrame): DataFrame = {
    ...
  }

  def transformSchema(schema: StructType): StructType = {
    val inputFields = schema.fields
    StructType(inputFields :+ StructField("colName", ???, true))
  }

  def copy (extra: ParamMap): Transformer = defaultCopy(extra)

}

How do I specify the DataType of the resulting field (i.e. fill in the ???)? It will be a Vector of some simple class (Boolean, Int, Double, etc). It seems VectorUDT might have worked, but that's private to Spark. Since any RDD can be converted to a DataFrame, any case class can be converted to a custom DataType. However I can't figure out how to manually do this conversion, otherwise I could apply it to some simple case class wrapping the vector.

Furthermore, if I specify a vector type for the column, will VectorAssembler correctly process the vector into separate features when I go to fit the model?

Still new to Spark and especially to the ML Pipeline, so appreciate any advice.

2
1) Vectors in Spark support only Double type (these are not the same as Scala Vector, see stackoverflow.com/q/31255756/1560062) 2) VectorUDT is not private 3) Not every RDD can converted to a DataFrame or at least not directly and not every case class can be automatically used asd Dataset (Frame) element. - zero323
And not all RDDs can be converted to DataFrames. Dirty RDD - Alberto Bonsanto
4) if I specify a vector type for the column, will VectorAssembler correctly process the vector into separate features - if don't understand what the question. Assembler just concatenates columns (and meta if present). - zero323
What I really want to do is create many more columns (not just a long vector in one column) but I couldn't figure out how todo that either without being extremely inefficient. I should only need to run through the data once to create all these columns, and right now I can only see how to do it with a loop. - mkreisel
That might explain my comment about VectorAssembler, since I really want each element of that vector to be its own feature. - mkreisel

2 Answers

6
votes
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType  
def transformSchema(schema: StructType): StructType = {
  val inputFields = schema.fields
  StructType(inputFields :+ StructField("colName", VectorType, true))
}

In spark 2.1 VectorType makes VectorUDT publicly available:

package org.apache.spark.ml.linalg

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.types.DataType

/**
 * :: DeveloperApi ::
 * SQL data types for vectors and matrices.
 */
@Since("2.0.0")
@DeveloperApi
object SQLDataTypes {

  /** Data type for [[Vector]]. */
  val VectorType: DataType = new VectorUDT

  /** Data type for [[Matrix]]. */
  val MatrixType: DataType = new MatrixUDT
}
3
votes
import org.apache.spark.mllib.linalg.{Vector, Vectors}

case class MyVector(vector: Vector)
val vectorDF = Seq(
  MyVector(Vectors.dense(1.0,3.4,4.4)),
  MyVector(Vectors.dense(5.5,6.7))
).toDF

vectorDF.printSchema
root
 |-- vector: vector (nullable = true)

println(vectorDF.schema.fields(0).dataType.prettyJson)
{
  "type" : "udt",
  "class" : "org.apache.spark.mllib.linalg.VectorUDT",
  "pyClass" : "pyspark.mllib.linalg.VectorUDT",
  "sqlType" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "type",
      "type" : "byte",
      "nullable" : false,
      "metadata" : { }
    }, {
      "name" : "size",
      "type" : "integer",
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "indices",
      "type" : {
        "type" : "array",
        "elementType" : "integer",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "values",
      "type" : {
        "type" : "array",
        "elementType" : "double",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    } ]
  }
}