0
votes

I have a FlinkKafkaConsumer defined as follows FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties) and I'm working with event time by using setStreamTimeCharacteristic(TimeCharacteristic.EventTime).

Now I want to assign a periodic watermark with the function assignTimestampsAndWatermarks, but I don't know what I should pass to that function since in the documentation the example of this function receive an element of type MyType with a getCreationTime() and my consumer is of type String.

Is it possible to assign event time in this situation?

EDIT: The time I would want to use as event time is the time each register was stored in Kafka.

1

1 Answers

0
votes

The notion of EventTime is at least in the definition strictly connected with the time at which events are created rather than received. So, if the events that You are consuming from Kafka have some kind of timestamp (for example if You are consuming JSON as String and then parsing it) then you can use this timestamp inside the assignTimestampsAndWatermarks function.

If you are parsing plain String objects then the best thing You could do is to use custom KafkaDeserializationSchema to extract Kafka timestamp for each event and use this.

Technically, You could even use the counter that increases artificially timestamp for each record(for example by incrementing it by 1), but this doesn't seem to make sense in terms of EventTime processing.