I have two compacted topics. One contains all information about my user (USERID) and the other one saves their addresses (USERID,ADRESSID). In brackets are the keys. What I want is to save in only one topic user data with their list of addresses. My approach is this:
KTable<GenericRecord, ArrayList<GenericRecord>> aggregatedAddresses = adressStream
.selectKey(...) //Selecting USERID as key - this generates KStream
.groupByKey(...) //Grouping by USERID as key - this generates KGroupedStream
.aggregate(...) //Aggregating by USERID as key - this generates KTable
.to("aggregated_addresses"); //KTable with USERID as key
I want to achieve to keep all data with their addresses in user_addresses. That means I don't want to lose any addresses after a period of time. Only if an address was removed in the db. My question is if my approach is a good one to achieve this. My prototype is working and it is saving a list of addresses to every user, but I am asking myself if KGroupedStream will remove some streams after a time or not.
Maybe somebody can explain me in detail how this pipeline is working. If a new stream (address) comes in it goes through the whole pipeline (selectKey,groupByKey,aggregate) and ends up in the topic aggregated_addresses where it is saved as a list of addresses? The step aggregate is using this statement:
(user, address, queue) -> {...}
Is Kafka streams using aggregated_addresses to fill the queue of the statement above? I am, if a new stream arrives .aggregate will Kafka search for their corresponding aggregated lists in aggregated_addresses and fill the queue with this data? Or is it using the grouped streams of .groupByKey and every time a new stream comes in, the whole grouped stream is sent to be aggregated? If the second one is true will KGroupedStream remove some streams for example after one week? If yes, some addresses would be missing from the queue?
What is internally the difference between KGroupedStream and KGroupedTable?
It's interesting, that the result after the join (in a compacted topic called user_addresses) has more entries than the entries table user has. I looked deeper and saw, that user with the same key has multiple occurrences (multiple offsets). At the smallest offset this user has no addresses, then at a higher offset, it has one address in his list and the highest offset it has two addresses in his list. I am again asking myself, why are old offsets not automatically removed, when I am using a compacted topic. Is Kafka's compaction working like a garbage collector which is removing data in afterward? What if I am searching for a key, will I get the key with the highest offset?
I am sorry for so many questions, but as I am working more and more with streams some things are unclear to me.
Thanks in advance for your help! :)