1
votes

We've started experimenting with Kafka to see if it can be used to aggregate our application data. I think our use case is a match for Kafka streams, but we aren't sure if we are using the tool correctly. The proof of concept we've built seems to be working as designed, I'm not sure that we are using the APIs appropriately.

Our proof of concept is to use kafka streams to keep a running tally of information about a program in an output topic, e.g.

{ 
  "numberActive": 0, 
  "numberInactive": 0, 
  "lastLogin": "01-01-1970T00:00:00Z"  
}

Computing the tally is easy, it is essentially executing a compare and swap (CAS) operation based on the input topic & output field.

The local state contains the most recent program for a given key. We join an input stream against the state store and run the CAS operation using a TransformSupplier, which explictly writes the data to the state store using

context.put(...)
context.commit();

Is this an appropriate use of the local state store? Is there another another approach to keeping a stateful running tally in a topic?

1

1 Answers

4
votes

Your design sounds right to me (I presume you are using PAPI not the Streams DSL), that you are reading in one stream, calling transform() on the stream in which an state store is associated with the operator. Since your update logic seems to be only key-dependent and hence can be embarrassingly parallelizable via Streams library based on key partitioning.

One thing to note that, it seems you are calling "context.commit()" after every single put call, which is not a recommended pattern. This is because commit() operation is a pretty heavy call that will involves flushing the state store, sending commit offset request to the Kafka broker etc, calling it on every single call would result in very low throughput. It is recommended to only call commit() only after a bunch of records are processed, or you can just rely on the Streams config "commit.interval.ms" to rely on Streams library to only call commit() internally after every time interval. Note that this will not affect your processing semantics upon graceful shutting down, since upon shutdown Streams will always enforce a commit() call.