0
votes

I have a flink process that listens to Kafka. The Messages consumed are then to be saved in a concurrent hash map for a period of time and then need to be sinked to cassandra.

The Operator chain goes something like

DataStream<Message> datastream = KafkaSource.createsource();
DataStream<Message> decodededMessage = datastream.flatmap(new DecodeMessage());

decodedMessage.assigneTimestampsandWatermarks(new AscendingTimestampExtractor<Message>(){

  public long extractAscendingTimestamp(Message m){

      return message.getTimestamp();
  }
  
}).keyBy((KeySelector<Message>) x-> x.getID())
  .process(new Timerfunction())
  .addSink(new MySink());



class TimerFunction extends KeyedProcessFunction<Integer,Message,Message>{

    private ValueState<Message> x;

    public void processElement(){
//some logic to create timestamp for one minute

     context.timerService().registerEventTimeTimer(x.getTimestamp());
     }
  
    public void onTimer()
  // output values on trigger
}


I got some doubts while working with eventime

  • Message will have a unique id and timestamp and some other attributes. There could be a million unique keys in a minute. Will keyBy operation effect performance?

I need to cover a scenario as below

  • X Message with ID 1 arrives at 8hrs 1minute and 1sec

  • Y Message with ID 2 arrives at 8hrs 1minute and 4th sec

    Since im using Id as Key Both these Messages should have a timer set to trigger at 8hrs 2min 0sec. As per flink documentation if timestamp of timers are same it will be triggered just once. I'm facing a problem where source becomes idle for few minutes the timer keeps waiting for next watermark and never triggers. How to deal with idle source?

  • Is using processingtime a better option in this case?

  • Also i have a restriction to use Flink v1.8 so would need some info with respect to that version.

Thanks in Advance

1

1 Answers

1
votes

I don't fully understand your question; there's too much context missing. But I can offer a few points:

(1) keyBy is expensive: it forces serialization/deserialization along with a network shuffle.

(2) Timers are deduplicated if and only if they are for the same timestamp and the same key.

(3) As for the idle source problem, the event time timers will eventually fire when events begin to flow again, as that will advance the watermark(s). If can't wait, you can use something like https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java, or switch to processing time.