1
votes

I'm trying to build a version of the decision tree classification example from Spark 2.0.2 org.apache.spark.examples.ml.JavaDecisionTreeClassificationExample. I can't use this directly because it uses libsvm-encoded data. I need to avoid libsvm (undocumented AFAIK) to classify ordinary datasets more easily. I'm trying to adapt the example to use a kyro-encoded dataset instead.

The issue originates in the map call below, particularly the consequences of using Encoders.kyro as the encoder as instructed by SparkML feature vectors and Spark 2.0.2 Encoders in Java

    public SMLDecisionTree(Dataset<Row> incomingDS, final String label, final String[] features)
{
    this.incomingDS = incomingDS;
    this.label = label;
    this.features = features;
    this.mapSet = new StringToDoubleMapperSet(features);

    this.sdlDS = incomingDS
            .select(label, features)
            .filter(new FilterFunction<Row>()
            {
                public boolean call(Row row) throws Exception
                {
                    return !row.getString(0).equals(features[0]); // header
                }
            })
            .map(new MapFunction<Row, LabeledFeatureVector>()
            {
                public LabeledFeatureVector call(Row row) throws Exception
                {
                    double labelVal = mapSet.addValue(0, row.getString(0));
                    double[] featureVals = new double[features.length];
                    for (int i = 1; i < row.length(); i++)
                    {
                        Double val = mapSet.addValue(i, row.getString(i));
                        featureVals[i - 1] = val;
                    }
                    return new LabeledFeatureVector(labelVal, Vectors.dense(featureVals));
                }
                // https://stackguides.com/questions/36648128/how-to-store-custom-objects-in-a-dataset
            }, Encoders.kryo(LabeledFeatureVector.class));

    Dataset<LabeledFeatureVector>[] splits = sdlDS.randomSplit(new double[] { 0.7, 0.3 });
    this.trainingDS = splits[0];
    this.testDS = splits[1];
}

This impacts the StringIndexer and VectorIndexer from the original spark example which are unable to handle the resulting kyro-encoded dataset. Here is the pipeline building code taken from the spark decision tree example code:

public void run() throws IOException
{
    sdlDS.show();
    StringIndexerModel labelIndexer = new StringIndexer()
            .setInputCol("label")
            .setOutputCol("indexedLabel")
            .fit(df);

    VectorIndexerModel featureIndexer = new VectorIndexer()
            .setInputCol("features")
            .setOutputCol("indexedFeatures")
            .setMaxCategories(4) // treat features with > 4 distinct values as continuous.
            .fit(df);

    DecisionTreeClassifier classifier = new DecisionTreeClassifier()
            .setLabelCol("indexedLabel")
            .setFeaturesCol("indexedFeatures");

    IndexToString labelConverter = new IndexToString()
            .setInputCol("prediction")
            .setOutputCol("predictedLabel")
            .setLabels(labelIndexer.labels());

    Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]
    { labelIndexer, featureIndexer, classifier, labelConverter });

This code apparently expects a dataset with "label" and "features" columns with the label and a Vector of double-encoded features. The problem is that kyro produces a single column named "values" that seems to hold a byte array. I know of no documentation for how to convert this to what the original StringIndexer and VectorIndexer expect. Can someone help? Java please.

1

1 Answers

1
votes

Don't use Kryo encoder in the first place. It is very limited in general and not applicable here at all. The simplest solution here is to drop custom class and use Row encoder. First you'll need a bunch of imports:

import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.ml.linalg.*;

and a schema:

List<StructField> fields = new ArrayList<>();

fields.add(DataTypes.createStructField("label", DoubleType, false));
fields.add(DataTypes.createStructField("features", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);

Encoder can be defined like this:

Encoder<Row> encoder = RowEncoder.apply(schema);

and use as shown below:

Dataset<Row> inputDs = spark.read().json(sc.parallelize(Arrays.asList(
        "{\"lablel\": 1.0, \"features\": \"foo\"}"
)));

inputDs.map(new MapFunction<Row, Row>() {
    public Row call(Row row) {
        return RowFactory.create(1.0, Vectors.dense(1.0, 2.0));
    }
}, encoder);