I configured event-time processing and have a connected stream with a CoFlatMapFunction
. I am writing a test case but I am seeing that the FlatMapFunction
does not invoke its methods flatMap1()
and flatMap2()
with events in event-time order.
Some pseudocode to clarify
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=0; i < 20; i=i+2){
ctxt.collectWithTimestamp(i, i);
ctxt.emitWatermark(i);
}
}
}
)
DataStream<Integer> oddStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=1; i < 21; i=i+2){
ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event
ctxt.emitWatermark(i);
}
}
}
)
evenStream
.connect(oddStream)
.flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){
public void flatMap1(Integer evenNumber, Collector<Integer> out){
System.out.println(evenNumber);
}
public void flatMap2(Integer oddNumber, Collector<Integer> out){
System.out.println(oddNumber);
}
}
);
When I run this, I expect it to print:
0,1,2,3,4....21
This is because I am setting the timestamp of even and odd numbers alternatively. In other words, 0 has the lowest timestamp, followed by 1, followed by 2 etc.
But it is printing all even numbers first followed by odd numbers.
In summary, I expected flatMap1() and flatMap2() to be called in the order of timestamps I set in the events. But that's not happening.