2
votes

In Flink, I found 2 ways to set up watermark,

the first is

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(5000)

the second is

env.addSource(
    new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
   WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)

I would like to know which will take effect eventually.

1

1 Answers

1
votes

There's no conflict at all between those two -- they are dealing with separate concerns. Everything specified will take effect.

The first one,

env.getConfig.setAutoWatermarkInterval(5000)

is specifying how often you want watermarks to be generated (one watermark every 5000 msec). If this wasn't specified, the default of 200 msec would be used instead.

The second,

env.addSource(
    new FlinkKafkaConsumer[...](...)
).assignTimestampsAndWatermarks(
   WatermarkStrategy.forBoundedOutOfOrderness[...](Duration.ofSeconds(10)).withTimestampAssigner(...)
)

is specifying the details of how those watermarks are to be computed. I.e., they should be generated by the FlinkKafkaConsumer using a BoundedOutOfOrderness strategy, with a bounded delay of 10 seconds. The WatermarkStrategy also needs a timestamp assigner.

There's no default WatermarkStrategy, so something like this second code snippet is required if you want to work with event time.