I try to use Spark Streaming and want to have a global state object that can be updated after each batch was processed. As far as I have found, there are at least two options for me:
1. Use mapWithState
, where Spark will update the state automatically after each batch was processed
2. use the updateStateByKey
function, where I have to call the update myself
For me both cases would be fine, but I get the same error both of the times. Here is my example code for both cases and their error:
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one,
State<Integer> state) {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
};
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));;
Tuple2<String, Long> output = new Tuple2<>(word, sum);
state.update(sum);
return new String("Test");
}
});
This one is taken from the example provodided by Spark itself. Sadly I recieve the following error regarding theStateSpec.function(...)
:
The method function(Function3< KeyType,Optional< ValueType>,State< StateType>,MappedType>) in the type StateSpec is not applicable for the arguments (Function3< String,Optional< Integer>,State< Integer>,Tuple2< String,Integer>>)
Using:
JavaPairDStream<String, Long> runningCounts = processed.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> values, Optional<Long> state) {
Long newSum = state.orElse((long)0);
for(long x : values){
newSum +=x;
}
return Optional.of(newSum);
});
Results in a similar error:
The method updateStateByKey(Function2< List< Long>,Optional< S>,Optional< S>>) in the type JavaPairDStream< String,Long> is not applicable for the arguments (new Function2< List,Optional< Long>,Optional< Long>>(){})
A Snapshot of my imports are:
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
I hope someone can help me to find my mistake.