0
votes

I'm setting up Apache Flink connected with Kafka broker.

I got a the following messages in random order:

  • message(timestamp=[..], index=1, someData=[..])
  • message(timestamp=[..], index=2, someData=[..])
  • message(timestamp=[..], index=3, someData=[..])
  • message(timestamp=[..], index=2, someData=[..])
  • message(timestamp=[..], index=3, someData=[..])
  • message(timestamp=[..], index=1, someData=[..])

My records coming from Kafka got a index field.

In my application, i need to compute last two of these records that have a same id and then immediately send the response.

For example, these two:

  • message(timestamp=[..], index=1, someData=[..])
  • message(timestamp=[..], index=1, someData=[..])

Whats the best way to store and compute last two records with the same index field? Could you tell me some tips?

1
When you say "last 2 events", is it according to event-time (timestamp field) or processing-time?Rafi Aroch

1 Answers

0
votes

Your requirements aren't entirely clear, but the mechanisms you probably want to understand are using keyBy(e -> e.index) to group/partition the stream by the index field, and keyed state for remembering the last event (or two) for each value of the index.

If you need to take the timestamps into account, and the event stream is out-of-order, even within a single value of the index, then you will need to first sort the stream by the timestamps. In that case you'll have a much easier time if you use Flink SQL to do the sorting, and then you could use match_recognize to do the pattern recognition, though that's perhaps overkill for such a simple pattern. Here's an example of how to do the sorting.