7
votes

Lets say that I have three Kafka topics filled with events representing business events occuring in different aggregates (event sourcing application). These events allow to build aggregates with following attributes :

  • users : usedId, name
  • modules of an application : moduleId, name
  • grants of users for modules of application : grantId, userId, moduleId, scope

Now I want to create a stream of all grants with name of users and products (instead of id). I thought to do so :

  1. create a KTable for users by grouping events by userId. The KTable has userId as key. It is ok.
  2. create a KTable for products by grouping events by productId. The KTable has productId as key. It is ok.
  3. create a stream from the stream of Grants and joining on the two KTable. It is no ok. The problem is that joins seem only possible on primary keys. But the key of the stream is an technical identifier of the Grant and keys of users and products tables are not (they are agnostic of Grant).

So how to proceed ?

2
Is product table same as module table? - Nishu Tayal
No they are two different tables. But they are used in the same way in this case (just as reference table to get information of users and of modules). - gentiane
So which key do you want to use in Grants table to refer the product ID? - Nishu Tayal
In Grants stream I want to use the userId field to join to the Users table, and the moduleId field to join to the Modules table. The key of Grants stream is grantId. - gentiane
please check the answer, and see if it helps you - Nishu Tayal

2 Answers

8
votes

Well, there is no direct support for Foreign key join at the moment in Kafka Streams.
There is an open KIP : https://issues.apache.org/jira/browse/KAFKA-3705 for the same.

For now, there can be a workaround to solve this problem. You can use KStream-KTable Join.

First Aggregate the User Stream and Module Stream into respective KTable with aggregated collection of Events.

KTable<String,Object> UserTable = userStream.groupBy(<UserId>).aggregate(<... build collection/latest event>) ;
KTable<String,Object> ModuleTable = moduleStream.groupBy(<ModuleId>).aggregate(<... build collection/latest event>);

Now select the moduleID as a key in the Grants stream.

KStream<String,Object> grantRekeyedStream = grantStream.selectKey(<moduleId>);

It will change the key to moduleId. Now you can perform Stream-Table Join with ModuleTable. It will join all the matching records from right side for the key in the left side. Result stream will have Grant and Module data into one stream with ModuleId as key.

KStream<String,Object> grantModuleStream = grantRekeyedStream.join(moduleTable);

Next step is to join with userTable. Hence you need to rekey the grantModuleTable again with userId.

KStream<String,Object> grantModuleRekeyedStream = grantModuleTable.selectKey(<Select UserId>);

Now grantModuleRekeyedStream can be joined with userTable with KStream-KTable Join

 KStream<String,Object> grantModuleUserStream = grantModuleRekeyedStream .join(userTable);

Above Stream will have user ID as a key and contain all grants and module details for that user.

2
votes

This feature was released as part of Kafka Streams 2.4.0.

Here's an official tutorial on using this feature.