2
votes

We have a pipeline (2.0.1) consisting of multiple feature transformation stages.

Some of these stages are OneHot encoders. Idea: classify an integer-based category into n independent features.

When training the pipeline model and using it to predict all works fine. However, storing the trained pipeline model and reloading it causes issues:

The stored 'trained' OneHot encoder does not keep track of how many categories there are. Loading it now causes issues: When the loaded model is used to predict, it redetermines how many categories there are, causing the training feature space and prediction feature space to be of a different size (dimension). See the example code below as run in a Zeppelin notebook:

import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.PipelineModel


// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector)
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;)
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler")

val enc = new OneHotEncoder()
  .setInputCol("class")
  .setOutputCol("class_one_hot")

val pipeline = new Pipeline()
  .setStages(Array(enc))

val model = pipeline.fit(df)
model.transform(df).show()

/*
+-----+------+-------------+
|class|filler|class_one_hot|
+-----+------+-------------+
|    5|     1|(5,[],[])    |
|    3|     1|(5,[3],[1.0])|
+-----+------+-------------+

Note: Vector of size 5
*/

model.write.overwrite().save("s3a://one-hot")

val loadedModel = PipelineModel.load("s3a://one-hot")

val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3
loadedModel.transform(df2).show()

/*
+-----+------+-------------+
|class|output|class_one_hot|
+-----+------+-------------+
|    3|     1|(3,[],[])    |
+-----+------+-------------+

Note: Incompatible vector of size 3
*/

I'd prefer to not make my own OneHot encoder that DOES support this serialization, are there any alternatives that I can use out of the box?

1

1 Answers

2
votes

Spark >= 2.3

Spark 2.3 introduces OneHotEncoderEstimator (to be renamed as OneHotEncoder in Spark 3.0) which can be used directly, and supports multiple input columns.

Spark < 2.3

You don't use OneHotEncoder as it is intended to be used. OneHotEncoder is a Transofrmer not an Estimator. It doesn't store any information about the levels but depends on the Column metadata to determine output dimensions. If metadata is missing, like in your case, it uses fallback strategy and assumes there is max(input_column) levels. Serialization is irrelevant here.

Typical usage involves Transformers in the upstream Pipeline, which set metadata for you. One common example is StringIndexer.

It is still possible to set metadata manually, but it is more involved:

import org.apache.spark.ml.attribute.NominalAttribute

val meta = NominalAttribute.defaultAttr
  .withName("class")
  .withValues("0", (1 to 5).map(_.toString): _*)
  .toMetadata

loadedModel.transform(df2.select($"class".as("class", meta), $"output"))

Similarly in Python (needs Spark >= 2.2):

from pyspark.sql.functions import col

meta = {"ml_attr": {
    "vals": [str(x) for x in range(6)],   # Provide a set of levels
    "type": "nominal", 
    "name": "class"}}

loaded.transform(
    df.withColumn("class", col("class").alias("class", metadata=meta))
)

Metadata can be also attached using a number of different methods: How to change column metadata in pyspark?.