0
votes

I have a use case in which I get a MLlib model and a stream and want to get score (predict) a stream of data.

There are some examples and material on this issue using Scala but I cant translate it to Java.

Trying to run predict inside the map functions (as shown in the spark documentation)

JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
  new Function<LabeledPoint, Tuple2<Object, Object>>() {
    public Tuple2<Object, Object> call(LabeledPoint p) {
      Double score = model.predict(p.features());
      return new Tuple2<Object, Object>(score, p.label());
    }
  }
);

results in error:

invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation

My input is a coma separated two integers which I map into:

JavaDStream<Tuple2<Integer, Integer>> pairs

Then I want to transform it into:

JavaPairDStream<Integer, Double> scores

Where Double is the predict result and the Integer is a key so I will be able to reduce by the key.
This method results in a need to create a new DStream inside an existing one which I failed to do.

The predict method can be applied on RDD but I couldn't create a DStream back from it (must return void):

    pairs.foreachRDD(new Function<JavaRDD<Tuple2<Object, Object>>, Void >(){
    @Override
    public Void call(JavaRDD<Tuple2<Object, Object>> arg0) throws Exception {
        // TODO Auto-generated method stub

        RDD<Rating> a = sameModel.predict(arg0.rdd());

    }

  });

Any ideas on how this might be achieved?

1
DStream represents time-batched RDDs. I would be more interested to know what can you achieve by reconverting it to DStream and not as an RDD.CᴴᴀZ

1 Answers

0
votes

As far as I can tell problem here is not really a translation to Java but a specific model you use. MLlib provides two types of models - local and distributed. Local models can be serialized and used inside the map.

MatrixFactorizationModel model falls into the second category. It means it is internally using distributed data structures for predictions hence cannot be used from an action or transformation. If you want to use it for predictions on a whole RDD you have to pass it in the predict method like this:

model.predict(JavaRDD.toRDD(test))

See Java examples in Collaborative Filtering documentaion for details about required format of test data.