Got stuck a bit with CoFlatMapFunction
. It seems to work fine if I place it on the DataStream
before window but fails if placed after window's “apply” function.
I was testing two streams, main “Features” on flatMap1
constantly ingesting data and control stream “Model” on flatMap2
changing the model on request.
I am able to set and see b0/b1 properly set in flatMap2
, but flatMap1
always see b0 and b1 as was set to 0 at the initialization.
Am I missing something obvious here?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " + this + "\n");
}
@Override
public String toString(){
return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
}
}
apply
function? Maybe you can share the respective code with us. – Till Rohrmann