4
votes

I'm following Flink's documentation on how to use WatermarkStrategy with KafkaConsumer. The code is shown below

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)

Anytime I try to compile the code above I get an error saying

error: overloaded method value assignTimestampsAndWatermarks with alternatives:

error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(
2
Which flink version are you using? Can you please paste the full eror message. - TobiSH
I added the error. I'm using flink 1.11.2 - Mayokun
Without testing on my own I would say you are missing a type here. There is a good example in one of the test-cases for the Watermarker assignment: github.com/apache/flink/blob/… . There you can see that the WatermarkStrategy takes a type (which would be MyType in your example). Hope that helps - TobiSH

2 Answers

1
votes

The code below returns WatermarkStrategyy[Nothing] instead of WatermarkStrategy[String]

  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

I solved this by using this code

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)
1
votes

@Mayokun is right. But to make the code simpler, you could put the type information right after the static method:

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props) 
kafkaSource.assignTimestampsAndWatermarks(
     WatermarkStrategy.forBoundedOutOfOrderness[MyType](Duration.ofSeconds(20))
)