1
votes

I configured event-time processing and have a connected stream with a CoFlatMapFunction. I am writing a test case but I am seeing that the FlatMapFunction does not invoke its methods flatMap1() and flatMap2() with events in event-time order.

Some pseudocode to clarify

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=0; i < 20; i=i+2){
                     ctxt.collectWithTimestamp(i, i);
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )
DataStream<Integer> oddStream  = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=1; i < 21; i=i+2){
                     ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event 
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )

evenStream
   .connect(oddStream)
   .flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){

        public void flatMap1(Integer evenNumber, Collector<Integer> out){                  
               System.out.println(evenNumber);
        }
        public void flatMap2(Integer oddNumber, Collector<Integer> out){
               System.out.println(oddNumber);
        }

   }
   );

When I run this, I expect it to print:

0,1,2,3,4....21

This is because I am setting the timestamp of even and odd numbers alternatively. In other words, 0 has the lowest timestamp, followed by 1, followed by 2 etc.

But it is printing all even numbers first followed by odd numbers.

In summary, I expected flatMap1() and flatMap2() to be called in the order of timestamps I set in the events. But that's not happening.

1

1 Answers

0
votes

Flink's co-functions (functions on connected streams) do not give guarantees in which order their methods are called. The methods (e.g., flatMap1() and flatMap2()) are called whenever an event is available from either input. In your example, the amount of data that the even source generates is too small such that all data is already processed when the odd numbers arrive.

So, how does event-time processing work for co-functions?

The watermarks of the co-function is always the minimum watermark of both inputs. For a CoFlatMapFunction this is not really important, because you can neither read the current watermark nor the timestamps of the records. However, with a CoProcessFunction you have access to both and can register timers that are called when the watermark reaches a certain point in time. If you want to sort the out-going stream on event time, you need to buffer incoming events (in state) and when the watermark progresses, you can emit all records in order up to the time over the watermark.