1
votes

I try to use Spark Streaming and want to have a global state object that can be updated after each batch was processed. As far as I have found, there are at least two options for me: 1. Use mapWithState, where Spark will update the state automatically after each batch was processed 2. use the updateStateByKey function, where I have to call the update myself

For me both cases would be fine, but I get the same error both of the times. Here is my example code for both cases and their error:

    Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
    new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> call(String word, Optional<Integer> one,
          State<Integer> state) {
        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
      }
    };


    JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
    wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
               Tuple2<String, Long> output = new Tuple2<>(word, sum);
               state.update(sum);
               return new String("Test");
        }
    });  

This one is taken from the example provodided by Spark itself. Sadly I recieve the following error regarding theStateSpec.function(...):

The method function(Function3< KeyType,Optional< ValueType>,State< StateType>,MappedType>) in the type StateSpec is not applicable for the arguments (Function3< String,Optional< Integer>,State< Integer>,Tuple2< String,Integer>>)

Using:

JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
            public Optional<Long> call(List<Long> values, Optional<Long> state) {
                Long newSum = state.orElse((long)0);
                for(long x : values){
                    newSum +=x;
                }
                return Optional.of(newSum);
              });

Results in a similar error:

The method updateStateByKey(Function2< List< Long>,Optional< S>,Optional< S>>) in the type JavaPairDStream< String,Long> is not applicable for the arguments (new Function2< List,Optional< Long>,Optional< Long>>(){})

A Snapshot of my imports are:

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;

I hope someone can help me to find my mistake.

1
could you please write your import statements? It looks like you have imported Function2 and Function3 from wrong packagesT. Gawęda
Thanks for the info, I should have added those. Sadly, they are imported from the right package (I think), the same I added the other functions from.Daniel Töws
Have you addded Spark-Streaming to your project? Maven, etc? I wrote some small application with exactly the same mapping function as you and it's okT. Gawęda
I found the error... you brought me on track with the error. Turns out, the example I read was for Spark 2.0.2, while I use 1.6.1. In between they changed where the Optional Object is imported from. I importet Optional from the wrong package. Corrected that, everything runs fine now. Thanks for the hint! How do I properly mark this now in my question?Daniel Töws

1 Answers

2
votes

one more point to add, if you are using latest Spark 2.3.0 version then use below package to import Optional to resolve the same issue.

Java code:

import org.apache.spark.api.java.Optional;