I am working with Spark 2.1.1 on a dataset with ~2000 features and trying to create a basic ML Pipeline, consisting of some Transformers and a Classifier.
Let's assume for the sake of simplicity that the Pipeline I am working with consists of a VectorAssembler, StringIndexer and a Classifier, which would be a fairly common usecase.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
If the pipeline steps are separated into a transformer pipeline (VectorAssembler + StringIndexer) and a second classifier pipeline, and if the unnecessary columns are dropped in between both pipelines, training succeeds. This means for reusing the models, two PipelineModels have to be saved after training and an intermediary preprocessing step has to be introduced.
// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)
val mainPipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator()
.setEstimator(mainPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]
The (imho) much cleaner solution would be to merge all pipeline stages into one pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assmbleFeatures, rf))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
// This will fail!
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]
However, putting all PipelineStages into one Pipeline leads to the following exception, probably due to the issue this PR will eventually solve:
ERROR CodeGenerator: failed to compile: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
The reason for this is that the VectorAssembler effectively doubles (in this example) the amount of data in the DataFrame, as there is no transformer that could drop the unnecessary columns. (See spark pipeline vector assembler drop other columns)
To the example works on the golub dataset and the following preprocessing steps are necessary:
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)
// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)
// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))
// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")
As I am new to Spark, I am not sure what would be the best way to solve this issue. Would you suggest...
to create a new transformer, which drops columns and that can be incorporated into the pipeline?- split both Pipelines and introduce the intermediary step
- anything else? :)
Or am I missing anything important (pipeline steps, PR, etc.) that would solve this issue?
Edit:
I implemented a new Transformer DroppingVectorAssembler
, which drops unnecessary columns, however, the same exception is thrown.
Besides that, setting spark.sql.codegen.wholeStage
to false
does not solve the issue.
the VectorAssembler effectively doubles the number of columns
, that the amount of data is doubled? I'd argue that theVectorAssembler
groupsn
columns into oneArray
column holdingn
elements. – Boern