I have a flink process that listens to Kafka. The Messages consumed are then to be saved in a concurrent hash map for a period of time and then need to be sinked to cassandra.
The Operator chain goes something like
DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());
decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){
public long extractAscendingTimestamp(Message m){
return message.getTimestamp();
}
}).keyBy((KeySelector<Message>) x-> x.getID())
.process(new Timerfunction())
.addSink(new MySink());
class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{
private ValueState<Message> x;
public void processElement(){
//some logic to create timestamp for one minute
context.timerService().registerEventTimeTimer(x.getTimestamp());
}
public void onTimer()
// output values on trigger
}
I got some doubts while working with eventime
- Message will have a unique id and timestamp and some other attributes. There could be a million unique keys in a minute. Will keyBy operation effect performance?
I need to cover a scenario as below
X Message with ID 1 arrives at 8hrs 1minute and 1sec
Y Message with ID 2 arrives at 8hrs 1minute and 4th sec
Since im using Id as Key Both these Messages should have a timer set to trigger at 8hrs 2min 0sec. As per flink documentation if timestamp of timers are same it will be triggered just once. I'm facing a problem where source becomes idle for few minutes the timer keeps waiting for next watermark and never triggers. How to deal with idle source?
Is using processingtime a better option in this case?
Also i have a restriction to use Flink v1.8 so would need some info with respect to that version.
Thanks in Advance