5
votes

I have an RDD in Spark where the objects are based on a case class:

ExampleCaseClass(user: User, stuff: Stuff)

I want to use Spark's ML pipeline, so I convert this to a Spark data frame. As part of the pipeline, I want to transform one of the columns into a column whose entries are vectors. Since I want the length of that vector to vary with the model, it should be built into the pipeline as part of the feature transformation.

So I attempted to define a Transformer as follows:

class MyTransformer extends Transformer {

  val uid = ""
  val num: IntParam = new IntParam(this, "", "")

  def setNum(value: Int): this.type = set(num, value)
  setDefault(num -> 50)

  def transform(df: DataFrame): DataFrame = {
    ...
  }

  def transformSchema(schema: StructType): StructType = {
    val inputFields = schema.fields
    StructType(inputFields :+ StructField("colName", ???, true))
  }

  def copy (extra: ParamMap): Transformer = defaultCopy(extra)

}

How do I specify the DataType of the resulting field (i.e. fill in the ???)? It will be a Vector of some simple class (Boolean, Int, Double, etc). It seems VectorUDT might have worked, but that's private to Spark. Since any RDD can be converted to a DataFrame, any case class can be converted to a custom DataType. However I can't figure out how to manually do this conversion, otherwise I could apply it to some simple case class wrapping the vector.

Furthermore, if I specify a vector type for the column, will VectorAssembler correctly process the vector into separate features when I go to fit the model?

Still new to Spark and especially to the ML Pipeline, so appreciate any advice.

2
1) Vectors in Spark support only Double type (these are not the same as Scala Vector, see stackoverflow.com/q/31255756/1560062) 2) VectorUDT is not private 3) Not every RDD can converted to a DataFrame or at least not directly and not every case class can be automatically used asd Dataset (Frame) element.zero323
And not all RDDs can be converted to DataFrames. Dirty RDDAlberto Bonsanto
4) if I specify a vector type for the column, will VectorAssembler correctly process the vector into separate features - if don't understand what the question. Assembler just concatenates columns (and meta if present).zero323
What I really want to do is create many more columns (not just a long vector in one column) but I couldn't figure out how todo that either without being extremely inefficient. I should only need to run through the data once to create all these columns, and right now I can only see how to do it with a loop.mkreisel
That might explain my comment about VectorAssembler, since I really want each element of that vector to be its own feature.mkreisel

2 Answers

6
votes
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType  
def transformSchema(schema: StructType): StructType = {
  val inputFields = schema.fields
  StructType(inputFields :+ StructField("colName", VectorType, true))
}

In spark 2.1 VectorType makes VectorUDT publicly available:

package org.apache.spark.ml.linalg

import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.types.DataType

/**
 * :: DeveloperApi ::
 * SQL data types for vectors and matrices.
 */
@Since("2.0.0")
@DeveloperApi
object SQLDataTypes {

  /** Data type for [[Vector]]. */
  val VectorType: DataType = new VectorUDT

  /** Data type for [[Matrix]]. */
  val MatrixType: DataType = new MatrixUDT
}
3
votes
import org.apache.spark.mllib.linalg.{Vector, Vectors}

case class MyVector(vector: Vector)
val vectorDF = Seq(
  MyVector(Vectors.dense(1.0,3.4,4.4)),
  MyVector(Vectors.dense(5.5,6.7))
).toDF

vectorDF.printSchema
root
 |-- vector: vector (nullable = true)

println(vectorDF.schema.fields(0).dataType.prettyJson)
{
  "type" : "udt",
  "class" : "org.apache.spark.mllib.linalg.VectorUDT",
  "pyClass" : "pyspark.mllib.linalg.VectorUDT",
  "sqlType" : {
    "type" : "struct",
    "fields" : [ {
      "name" : "type",
      "type" : "byte",
      "nullable" : false,
      "metadata" : { }
    }, {
      "name" : "size",
      "type" : "integer",
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "indices",
      "type" : {
        "type" : "array",
        "elementType" : "integer",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    }, {
      "name" : "values",
      "type" : {
        "type" : "array",
        "elementType" : "double",
        "containsNull" : false
      },
      "nullable" : true,
      "metadata" : { }
    } ]
  }
}