2
votes

I have two compacted topics. One contains all information about my user (USERID) and the other one saves their addresses (USERID,ADRESSID). In brackets are the keys. What I want is to save in only one topic user data with their list of addresses. My approach is this:

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key 
At the end, I am doing a leftJoin on user and aggregated_addresses over USERID and saving the result to a compacted topic called "user_addresses".

I want to achieve to keep all data with their addresses in user_addresses. That means I don't want to lose any addresses after a period of time. Only if an address was removed in the db. My question is if my approach is a good one to achieve this. My prototype is working and it is saving a list of addresses to every user, but I am asking myself if KGroupedStream will remove some streams after a time or not.

Maybe somebody can explain me in detail how this pipeline is working. If a new stream (address) comes in it goes through the whole pipeline (selectKey,groupByKey,aggregate) and ends up in the topic aggregated_addresses where it is saved as a list of addresses? The step aggregate is using this statement:

(user, address, queue) -> {...}

Is Kafka streams using aggregated_addresses to fill the queue of the statement above? I am, if a new stream arrives .aggregate will Kafka search for their corresponding aggregated lists in aggregated_addresses and fill the queue with this data? Or is it using the grouped streams of .groupByKey and every time a new stream comes in, the whole grouped stream is sent to be aggregated? If the second one is true will KGroupedStream remove some streams for example after one week? If yes, some addresses would be missing from the queue?

What is internally the difference between KGroupedStream and KGroupedTable?

It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?

I am sorry for so many questions, but as I am working more and more with streams some things are unclear to me.

Thanks in advance for your help! :)

1

1 Answers

3
votes

I am asking myself if KGroupedStream will remove some streams after a time or not.

It won't delete anything.

If I understand the rest of your question, you are asking how aggregate() operator work. It uses a local state store (implemented using RocksDB) to store <userId, X> with X being whatever your aggregation UDF ((user, address, queue) -> { }) returns, ie, it should be X == queue). Thus, each input record does a local lookup into RocksDB to fetch the current queue, updates it, write it back to RocksDB and sends it downstream into your to() operator that write it also into the result topic.

Also read the docs for more details: https://kafka.apache.org/21/documentation/streams/ There is also plenty of other material about Kafka Streams and how it works on the Internet (blog posts, talk recording, slides...)

It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?

Compaction is done async in the background, but not immediately. Also note that topic (or to be more precise) partitions are split into "segment" and the active segment is never compacted (default segment size is 1GB). You can configure segment size and how ofter compaction is triggered (read the docs for more details: https://kafka.apache.org/documentation/#compaction).

What if I am searching for a key, will I get the key with the highest offset?

Not sure what you mean by this. Kafka only allows sequential reads but no key lookups. Thus, you would need to read the topic from the beginning to the end to find the latest version for a key. If you refer to Kafka Streams "Interactive Queries" feature, it would query the local RocksDB and thus contain the latest entry for each key.

My question is if my approach is a good one to achieve this.

Yes, with one important detail, that is related to

What is internally the difference between KGroupedStream and KGroupedTable?

Because you input topic is a compacted topic that uses keys (userId,addressId) you should read it as table() (not a stream()):

KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses =
    builder.table("address-topic")
      .selectKey(...) //Selecting USERID as key - this generates KStream
      .groupBy(...) //Select USERID as and group by USERID
      .aggregate(...) //Aggregating by USERID as key - this generates KTable
      .to("aggregated_addresses"); //KTable with USERID as key 

The difference is, that if you read a topic a KStreams, is interpreted a "facts", and thus there are no delete semantics. However, you input topic contain "updates" records and thus, it should be consumer as such. A KGroupedStream and KGroupedTable are just intermediate objects in the API and also imply "fact" vs "update" semantics. Again, check out the docs and more material on the Internet for more details.