1
votes

I want to import a PMML model, to compute a score using Spark. Everything works fine when I do not use spark, but I can not use my method in a mapper.

Problem is that I need an Evaluation object from org.jpmml.evaluator.Evaluator which seems to not be Serializable. So I tryed to make it to Serialiazable, with the following class :

package util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.jpmml.evaluator.Evaluator;

public class SerializableEvaluator implements Serializable {

    private static final long serialVersionUID = 6631604036553063657L;
    private Evaluator evaluator;

    public SerializableEvaluator(Evaluator evaluator) {
        this.evaluator = evaluator;
    }

    public Evaluator getEvaluator() {
        return evaluator;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(evaluator);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Evaluator eval = (Evaluator) in.readObject();
    }
}

I also have made all my classes serializable.

Here is a sample of my code :

        logger.info("Print 5 first rows----------------------------");
        strTitanicRDD
                .take(5)
                .forEach(row -> logger.info(row));
        logger.info("Print 5 first Titatnic Obs---------------------");
        strTitanicRDD
                .map(row -> new TitanicObservation(row))
                .take(5)
                .forEach(titanic -> logger.info(titanic.toString()));
        logger.info("Print 5 first Scored Titatnic Obs---------------");

        try{strTitanicRDD
            .map(row -> new TitanicObservation(row))
            .map(
                new Function<TitanicObservation,String>(){

                    private static final long serialVersionUID = -2968122030659306400L;

                    @Override
                    public String call(TitanicObservation titanic) throws Exception {
                        String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
                        return res;
                    }

                })
        .take(5)
        .forEach(row -> logger.info(row));

But I do not think my code will help you to solve my problem, which is very clear (see logs :)

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) at org.apache.spark.rdd.RDD.map(RDD.scala:286) at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:89) at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) at score.acv.AppWithSpark.main(AppWithSpark.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more
1

1 Answers

2
votes

Behind the org.jpmml.evaluator.Evaluator interface there's an instance of some org.jpmml.evaluator.ModelEvaluator subclass. The class ModelEvaluator and all its subclasses are serializable by design. The problem pertains to the org.dmg.pmml.PMML object instance that you provided to the ModelEvaluatorFactory#newModelManager(PMML) method in the beginning.

In brief, every PMML class model object can have SAX Locator information attached to it. This is useful in development and testing stages for locating offending XML content. However, in production stage this information should not be kept around anymore. You can disable SAX Locator information either by configuring your JAXB runtime properly, or by simply clearing existing SAX Locator instances by invoking PMMLObject#setLocator(Locatable) with the null argument. The latter functionality is formalized by the org.jpmml.model.visitors.LocatorNullifier Visitor class.

For a complete example, please see the org.jpmml.spark.EvaluatorUtil utility class (especially around lines 73 to 75) of the official JPMML-Spark project. Why don't you use JPMML-Spark in the first place?