0
votes

I create a Kafka Streams application, which receives different JSON objects from different topics and I want to implement some kind of wait function, but I'm not sure about how to implement it best.

To simplify the problem I'll use simplified entities in the following section, I hope the problem can be described very good with it. So in one of my streams I receive car objects and every car has an id. In a second stream I receive person objects and every person has also a car id and is assigned to a car with this id.

I want to read with my Kafka Streams application from both input streams (topics) and enrich the car object with the four persons, which have the same car id. The car objects should only be forwarded to the next downstream processor when all four persons are included into the car object.

I have planned to create an input stream for the car and one for the person objects, parse the JSON data into the internal object representation, merge both streams together and apply a "selectKey" function on the merged stream to extract the keys out of the entities. After that I would push the data into a custom transformation function, which has a state store inlcuded. Inside this transform function I would store every arriving car object with its id in the state store. As soon as new person objects arrive, I would add them to the respective car object in the state store (please ignore the case of late arriving cars here). As soon as four persons are in a car object, I would forward the object to the next stream function and remove the car object out of the state store.

Would this be a suitable approach for this? I'm not sure about scalability, because I have to make sure that when running multiple instances that the car and person objects with the same id will be processed by the same application instance. I would use the selectKey function for this, would that work?

Thanks!

1

1 Answers

1
votes

The basic design looks sound to me.

However, selectKey() itself will not be sufficient, because transform() (in contrast to DSL operators) does not trigger an auto-rebalance. Thus, you need to manually rebalance via through().

stream.selectKey(...)
      .through("user-created-topic")
      .transform(...);

https://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning