I want to do a KStream to KTable Join. using KTable as just a lookup table. below steps shows the sequence in which code is executed
Construct KTable
ReKey KTable
Construct KStream
ReKey KStream
Join KStream - KTable
Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.
Every time i do a join for first time or when i start the application. Expected output is 8000 records but i see only 6200 records sometimes, sometimes 8000 complete set of records(twice), some times no record, etc.
Question 1: why is there inconsistency in records every single time i run the application?
Before the KTable gets constructed(construct + Rekey), KStreams gets Constructed and data is available for join from KStream side then join starts with out KTable so there wont be data seen in final join until KTable is constructed. after once KTable is constructed then we can see join happening for the remaining records.
Question 2: How to resolve the inconsistency join in records?
I tried with Test case using Embedded Kafka for KStream and Ktable join. There was 10 records from KStreams and 3 records from KTable which were used process. when i ran the test case for the first time There was no Join and i didn’t see any data after join. When ran the same second time it ran perfectly. If i clear the state store then back to zero.
Question 3: Why is this behaviour happening?
I tried with KSQL and the join was working perfectly and i got 8000 records, then i went into KSQL source code, i noticed KSQL is also doing same Join function.
Question 4: How is KSQL resolving the issue?
I saw few example suggested answers
- Use GlobalKTable which didn’t work. i got same inconsistent join.
- use Custom joiner https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java which didn’t work
I am using spring cloud streams as dependency.
Also i saw there was a open issue regarding this somewhere on JIRA.