5
votes

I wonder if there's any way to sort records within a window using Kafka Streams DSL or Processor API.

Imagine the following situation as an example (arbitrary one, but similar to what I need):

  1. There is a Kafka topic of some events, let's say user clicks. Let's say topic has 10 partitions. Messages are partitioned by key, but each key is unique, so it's sort of a random partitioning. Each record contains a user id, which is used later to repartition the stream.

  2. We consume the stream, and publish each message to another topic partitioning the record by it's user id (repartition the original stream by user id).

  3. Then we consume this repartitioned stream, and we store consumed records in local state store windowed by 10 minutes. All clicks of a particular user are always in the same partition, but order is not guarantied, because the original topic had 10 partitions.

  4. I understand the windowing model of Kafka Streams, and that time is advanced when new records come in, but I need this window to use processing time, not the event time, and then when window is expired, I need to be able to sort buffered events, and emit them in that order to another topic.

Notice:

  1. We need to be able to flush/process records within the window using processing time, not the event time. We can't wait for the next click to advance the time, because it may never happen.

  2. We need to remove all the records from the store, as soon window is sorted and flushed.

  3. If application crashes, we need to recover (in the same or another instance of the application) and process all the windows that were not processed, without waiting for new records to come for a particular user.

I know Kafka Streams 1.0.0 allows to use wall clock time in Processing API, but I'm not sure what would be the right way to implement what I need (more importantly taking into account the recovery process requirement described above).

1
I have exactly this requirement too. There's a bit of info in this post stackoverflow.com/questions/38935904/…Kyle Fransham

1 Answers

2
votes

You can see my answer to a similar question here: https://stackoverflow.com/a/44345374/7897191

Since your message keys are already unique you can ignore my comments about de-duplication.

Now that KIP-138 (wall-clock punctuation semantics) has been released in 1.0.0 you should be able to implement the outlined algorithm without issues. It uses the Processor API. I don't know of a way of doing this with only the DSL.