0
votes

I have a topic which contains events of user connection and disconnection for each session. I would like to use Kafka stream to process this topic and update KTable based on some condition. Each record cannot update KTable. So I need to process multiple records to know if KTable has to be updated.

For eg, process stream and aggregate by user and then by sessionid. If atleast one sessionid of that user has only Connected event, KTable must be updated as user online if not already.
If all sessionId of the user has Disconnected event, KTable must be updated as user offline if not already.

How can I implement such a logic?
Can we implement this KTable in all application instances so that each instance has this data available locally?

1

1 Answers

0
votes

Sounds like a rather complex scenario.

Maybe, it's best to use the Processor API for this case? A KTable is basically just a KV-store, and using the Processor API, allows you to apply complex processing to decide if you want to update the state store or not. A KTable itself does not allow you to apply complex logic but it will apply each update it receives.

Thus, using the DSL, you would need to do some per-processing, and if you want to update a KTable send an update record only for this case. Something like this:

KStream stream = builder.stream("input-topic");
// apply your processing and write an update record into `updates` when necessary
KStream updates = stream...
KTable table = updates.toTable();