0
votes

I am trying to use event time in my Flink job, and using BoundedOutOfOrdernessTimestampExtractor to extract timestamp and generate watermark. But I have some input Kafka having sparse stream, it can have no data for a long time, which makes the getResult in AggregateFunction not called at all. I can see data going into add function.

I have set getEnv().getConfig().setAutoWatermarkInterval(1000L); I tried

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

also session window

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

All the watermark metics shows No Watermark How can I let Flink to ignore that no watermark thing?

4

4 Answers

5
votes

FYI, this is commonly referred to as the "idle source" problem. This occurs because whenever a Flink operator has two or more inputs, its watermark is the minimum of the watermarks from its inputs. If one of those inputs stalls, its watermark no longer advances.

Note that Flink does not have per-key watermarking -- a given operator is typically multiplexed across events for many keys. So long as some events are flowing through a given task's input streams, its watermark will advance, and event time timers for idle keys will still fire. For this "idle source" problem to occur, a task has to have an input stream that has become completely idle.

If you can arrange for it, the best solution is to have your data sources include keepalive events. This will allow you to advance your watermarks with confidence, knowing that the source is simply idle, rather than, for example, offline.

If that's not possible, and if you have some sources that aren't idle, then you could put a rebalance() in front of the BoundedOutOfOrdernessTimestampExtractor (and before the keyBy), so that every instance continues to receive some events and can advance its watermark. This comes at the expense of an extra network shuffle.

Perhaps the most commonly used solution is to use a watermark generator that detects idleness and artificially advances the watermark based on a processing time timer. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor is an example of that.

1
votes

A new watermark with idleness capability has been introduced. Flink will ignore these idle watermarks while calculating the minimum so the single partition with the data will be considered. https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.html

0
votes

I have the same issue - a src that may be inactive for a long time.
The solution below is based on WatermarksWithIdleness.

It is a standalone Flink job that demonstrate the concept.

package com.demo.playground.flink.sleepysrc;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class SleepyJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final EventGenerator eventGenerator = new EventGenerator();
        WatermarkStrategy<Event> strategy = WatermarkStrategy.
                <Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).
                withIdleness(Duration.ofSeconds(Constants.IDLE_TIME_SEC)).
                withTimestampAssigner((event, timestamp) -> event.timestamp);
        final DataStream<Event> events = env.addSource(eventGenerator).assignTimestampsAndWatermarks(strategy);
        KeyedStream<Event, String> eventStringKeyedStream = events.keyBy((Event event) -> event.id);
        WindowedStream<Event, String, TimeWindow> windowedStream = eventStringKeyedStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(Constants.SESSION_WINDOW_GAP)));
        windowedStream.allowedLateness(Time.milliseconds(1000));
        SingleOutputStreamOperator<Object> result = windowedStream.process(new ProcessWindowFunction<Event, Object, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<Event> events, Collector<Object> collector) {
                int counter = 0;
                for (Event e : events) {
                    Utils.print(++counter + ") inside process: " + e);
                }
                Utils.print("--- Process Done ----");
            }
        });
        result.print();
        env.execute("Sleepy flink src demo");
    }


    private static class Event {
        public Event(String id) {
            this.timestamp = System.currentTimeMillis();
            this.eventData = "not_important_" + this.timestamp;
            this.id = id;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "id=" + id +
                    ", timestamp=" + timestamp +
                    ", eventData='" + eventData + '\'' +
                    '}';
        }

        public String id;
        public long timestamp;
        public String eventData;
    }

    private static class EventGenerator implements SourceFunction<Event> {

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            /**
             *  Here is the sleepy src - after NUM_OF_EVENTS events are collected , the code goes to a SHORT_SLEEP_TIME sleep
             *  We would like to detect this inactivity and FIRE the window
             */
            int counter = 0;
            while (running) {
                String id = Long.toString(System.currentTimeMillis());
                Utils.print(String.format("Generating %d events with id %s", 2 * Constants.NUM_OF_EVENTS, id));
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                // here we create a delay:
                // a time of inactivity where
                // we would like to FIRE the window
                Thread.sleep(Constants.SHORT_SLEEP_TIME);
                counter = 0;
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                Thread.sleep(Constants.LONG_SLEEP_TIME);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }

        private volatile boolean running = true;

    }

    private static final class Constants {
        public static final int VERY_SHORT_SLEEP_TIME = 300;
        public static final int SHORT_SLEEP_TIME = 8000;
        public static final int IDLE_TIME_SEC = 5;
        public static final int LONG_SLEEP_TIME = SHORT_SLEEP_TIME * 5;
        public static final long SESSION_WINDOW_GAP = 60 * 1000;
        public static final int NUM_OF_EVENTS = 4;
    }

    private static final class Utils {
        public static void print(Object obj) {
            System.out.println(new java.util.Date() + " > " + obj);
        }
    }
}
0
votes

For others, make sure there's data coming out of all your topics' partitions if you're using Kafka

I know it sounds dumb, but in my case I had a single source and the problem was still happening, because I was testing with very little data in a single Kafka topic (single source) that had 10 partitions. The dataset was so small that some of the topic's partitions did not have anything to give and, although I had only one source (the one topic), Flink did not increase the Watermark.

The moment I switched my source to a topic with a single partition the Watermark started to advance.