0
votes

I am trying to calculate the rate of incoming events per minute from a Kafka topic based on event time. I am using TumblingEventTimeWindows of 1 minute for this. The code snippet is given below.

I have observed that if I am not receiving any event for a particular window, e.g. from 2.34 to 2.35, then the previous window of 2.33 to 2.34 does not get closed. I understand the risk of losing data for the window of 2.33 to 2.34 (may happen due to system failure, bigger Kafka lag, etc.), but I cannot wait indefinitely. I need to close this window after waiting for a certain period of time, and subsequent windows can continue after the system recovers. How can I achieve this?

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forMonotonousTimestamps()
                    .withIdleness(Duration.ofSeconds(60))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .assignTimestampsAndWatermarks(new EntityWatermarkStrategy())
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);

private static class EntityWatermarkStrategy implements WatermarkStrategy<Entity> {
    @Override
    public WatermarkGenerator<Entity> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

        return new EntityWatermarkGenerator();
    }

}

private static class EntityWatermarkGenerator implements WatermarkGenerator<Entity> {

    private long maxTimestamp;

    public EntityWatermarkGenerator() {
        this.maxTimestamp = Long.MIN_VALUE + 1;
    }

    @Override
    public void onEvent(Entity event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp + 2));
    }
}

Also, I tried adding some custom triggers, but it didn't help. I am using Apache Flink 1.11

Can somebody suggest, what wrong am I doing?

When I tried to push some more data with the newer timestamp (say t+1) of a topic, data from an earlier timeframe (t) gets pushed. but again for t+1 data, the same issues occur as of t.

3
Hi Parag. That seems to work as expected, no? When doing event-time processing, the only way Flink can know that time is moving forward is by reading more recent timestamps in new inbound data. The fact that that can happen to take a long processing time for that is not relevant here: from its point of view there is no way to distinguish absence of data due to technical issue and due to its non existence.Svend
@Svend not sure here. If we are using tumbling window of 1(or n min), after expiry of that window, it should advance the watermark. so if at all, anything is stuck/hung, it can be pushed to downstream for further processing. e.g. if kafka broker went down due to any issue, flink will not receive event for said duration. But events/messages those were pushed just before failure of kafka and just before expiry of tumbling window, will be held with flink itself. So in this case that would not be desired behaviorParag Somani
I have added a WA, i.e. to push an dummy event at every regular time, so that, if something got stuck or idle for long duration, it would be flushed and data will be pushed to downstream. Though, with unit tests it works, but I dont think, its a good solution.Parag Somani
> "If we are using tumbling window of 1(or n min), after expiry of that window, it should advance the watermark. " My understanding is that, if we are speaking about event-time tumbling window, expiry of that window can only happen in event time, i.e. after some new timestamp has been observed in the data itself. The fact that some processing time is passing is not having any influence on the closing of event-time tumbling window. My understanding is that, this is by design.Svend
I have put a workaround by sending a dummy event on each partition of kafka topic at every regular interval.Parag Somani

3 Answers

1
votes

One reason why withIdleness() isn't helping in your case is that you are calling assignTimestampsAndWatermarks on the datastream after it has been emitted by the kafka source, rather than calling it on the FlinkKafkaConsumer itself. If you were to do the latter, then the FlinkKafkaConsumer would be able to assign timestamps and watermarks on a per-partition basis, and would consider idleness at the granularity of each individual kafka partition. See Watermark Strategies and the Kafka Connector for more info.

To make this work, however, you'll need to use a deserializer other than a SimpleStringSchema (such as a KafkaDeserializationSchema) that is able to create individual stream records, with timestamps. See https://stackoverflow.com/a/62072265/2000823 for an example of how to implement a KafkaDeserializationSchema.

Keep in mind, however, that withIdleness() will not advance the watermark if all partitions are idle. What it will do is to prevent idle partitions from holding back the watermark, which may advance if there are events from other partitions.

0
votes

See the idle partitions documentation for an approach to solving your problem.

0
votes

using flink 1.11+ watermarkstrategy api should help you avoid pumping dummy data. What you need is to generate watermark at the end of minute periodically. this is the reference:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html

Create a flinkKafkaConsumer with CustomKafkaSerializer:

FlinkKafkaConsumer otherConsumer = new FlinkKafkaConsumer( topics, new CustomKafkaSerializer(apacheFlinkEnvironmentLoader), props);

How to create CustomKafkaSerializer ? Ans - Two questions about Flink deserializing

Now use watermark Strategy for this flinkKafkaConsumer:

FlinkKafkaConsumer<Tuple3<String,String,String>> flinkKafkaConsumer = apacheKafkaConfig.getOtherConsumer();
        flinkKafkaConsumer.assignTimestampsAndWatermarks(new ApacheFlinkWaterMarkStrategy(envConfig.getOutOfOrderDurationSeconds()).
                withIdleness(Duration.ofSeconds(envConfig.getIdlePartitionTimeout())));

So this is How WaterMark Strategy Looks Like?

Ans ->

public class ApacheFlinkWaterMarkStrategy implements WatermarkStrategy<Tuple3<String, String, String>> {

    private long outOfOrderDuration;

    public ApacheFlinkWaterMarkStrategy(long outOfOrderDuration)
    {
        super();
        this.outOfOrderDuration = outOfOrderDuration;
    }

    @Override
    public TimestampAssigner<Tuple3<String, String, String>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new ApacheFlinkTimeForEvent();
    }

    @Override
    public WatermarkGenerator<Tuple3<String, String, String>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new ApacheFlinkWaterMarkGenerator(this.outOfOrderDuration);
    } }

This is how we get event time from payload:

Ans->

public class ApacheFlinkTimeForEvent implements SerializableTimestampAssigner<Tuple3<String,String,String>> {

    public static final Logger logger = LoggerFactory.getLogger(ApacheFlinkTimeForEvent.class);

    private static final FhirContext fhirContext = FhirContext.forR4();


    @Override
    public long extractTimestamp(Tuple3<String,String,String> o, long l) {
        //get timestamp from payload
    }
}

This is how we generate watermarks periodically so that irrespective whether data arrives or not watermark gets updated every minute in each partition.

public class ApacheFlinkWaterMarkGenerator implements WatermarkGenerator<Tuple3<String,String,String>> {

    public static final Logger logger = LoggerFactory.getLogger(ApacheFlinkWaterMarkGenerator.class);

    private long outOfOrderGenerator;

    private long maxEventTimeStamp;

    public ApacheFlinkWaterMarkGenerator(long outOfOrderGenerator)
    {
        super();
        this.outOfOrderGenerator = outOfOrderGenerator;
    }

    @Override
    public void onEvent(Tuple3<String, String, String> stringStringStringTuple3, long l, WatermarkOutput watermarkOutput) {
        maxEventTimeStamp = Math.max(maxEventTimeStamp,l);
        Watermark eventWatermark = new Watermark(maxEventTimeStamp);
        watermarkOutput.emitWatermark(eventWatermark);
        logger.info("Current Watermark emitted from event is {}",eventWatermark.getFormattedTimestamp());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

        long currentUtcTime = Instant.now().toEpochMilli();
        Watermark periodicWaterMark = new Watermark(currentUtcTime-outOfOrderGenerator);
        watermarkOutput.emitWatermark(periodicWaterMark);
        logger.info("Current Watermark emitted periodically is {}",periodicWaterMark.getFormattedTimestamp());

    }
}

Also, periodic emitting of watermark has to be set at start of the application.

streamExecutionEnvironment.getConfig().setAutoWatermarkInterval(This is in milliseconds long);

This is how we add to custom watermark and timestamps to flinkKafkaConsumer.

flinkKafkaConsumer.assignTimestampsAndWatermarks(new ApacheFlinkWaterMarkStrategy(Out of Order seconds).
            withIdleness(IdlePartiton Seconds);