4
votes

I have a stream (kafka msgs being streaming onto a topic) with a flink kafka consumer and I notice an interesting behavior that I'm looking to address.

When the data is being streaming in, if it stops before a window is 'complete', or if the data ends (after a few windows) and does not reach the end of the window, the rest of the pipeline does not trigger.

Example flow:

    env.addSource(kafkaConsumer)
       .flatMap(new TokenMapper())
       .keyBy("word")
       .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
       .reduce(new CountTokens())
       .flatMap(new ConvertToString())
       .addSink(producer);

I am using the FlinkKafkaConsumer010 with the env TimeCharacteristic set to EventTime. and consumer.assignTimestampsAndWatermarks(new PeriodicWatermarks())

private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>{

    private long currentMaxTimestamp;
    private final long maxOutOfOrderness;

    public PeriodicWatermarksAuto(long maxOutOfOrderness){
        this.maxOutOfOrderness = maxOutOfOrderness;
    }

    @Override
    public Watermark getCurrentWatermark() {
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(String t, long l) {
        // this should be the event timestamp
        currentMaxTimestamp = l;
        logger.info("TIMESTAMP: " + l);
        return l;
    }
}

If my window is say 10 seconds, and my data stream only contains 8 seconds of data (and then stops streaming for some period of time), the flatMap->sink does not process until the new later data is streamed in.

Example data stream processing issue: (each x is a piece of data per second )

      xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
      ^(not processed)           (until I get here)^

Similarly if for example I had 35 seconds worth of streaming data (and again my window is 10 seconds) only 3 windows worth of data trigger, and the remaining 5 seconds worth of data never processes.

     ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
         (processed)        ^(not processed)          (until I get here)^

finally if my window is 10 seconds and I only have 5 seconds of streamed data the flatmap->sink never happens.

My question is, is there a way to trigger the windowed data to process if we don't see data after some time?

If my data was being live-streamed in, I can see there being stretches of no-data, and would not want the last window (of lets say just 5 seconds worth of data) to have to wait some undetermined time until new data comes in, I would want the results for that last window after the window time has passed.

Thinking out loud, this seems to be due to using EventTime instead of ProcessingTime, or, my watermark is not being generated properly for the last window to actually trigger... not sure maybe a bit of both? I would think this would be an issue for anyone, if your stream ends the last bit does not trigger. I would say I could probably send an end-of-stream msg but this does not help if the steam ends because the source breaks up-stream.

EDIT: So I changed to processing time and it does correctly process the data in the last window, so I guess EventTime is the culprit afterall, I'm thinking a custom trigger or proper window watermark might be the answer...

Thanks for the help!

1

1 Answers

2
votes

I'll leave this up for posterity as the issue was as I thought, related to the watermark. The timestamper and watermaker (from assignTimestampsAndWatermarks)calls 'getCurrentWatermark()' and since I was setting the watermark based off the incoming entities to a fixed number (their timestamp - a max offset) it would not update until it sees a new entitiy.

My solution is some sort of timer to eventually advance the watermark to the next window if data hasn't been seen in a configurable amount of time. I will not be able to process very latent data but I don't expect this should be a problem. This is intended behavior of the EventTime processing.