3
votes

I recently started experimenting with kafka streams. I have a scenario where I need to join a KStream with a KTable. It may be the case that the KTable does not contain some of the keys. In that case I get a NullPointerException.

specifically I was getting

stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException

I don't know how I can handle that. I cannot somehow filter out the records of the stream that do not correspond to a table entry.

update

Looking a bit further I found that I can query the underlying store to find whether a key exists through the ReadOnlyKeyValueStore interface.

In this case my question is, would that be the best way to go? i.e. Filtering the stream to be joined based on whether a key exists in the local store?

My second question in this case would be, since I care about leveraging the Global State Store introduced in version 10.2 in a next phase, should I expect that I will be also able in the same manner to query the Global State Store?

update

The previous update is not accurate since it's not possible to query the state store from inside the topology

final update

After understanding the join semantics a bit better I was able to solve the issue just be simplifying the valueJoiner to only return the results, instead of performing actions on the joined values, and adding an extra filtering step after the join to filter out null values.

1
I am still a little puzzled. When do you get the NullPointerException exactly? What version of Kafka do you use? What "type" of join do you use (ie, inner or left join)? Also check out this: docs.confluent.io/current/streams/…Matthias J. Sax
You might want to answer your question by yourself and accept your own answer. Or delete the question after all :)Matthias J. Sax
I was trying to do a leftJoin on the stream. I updated the question with the message I was getting. I am on version 0.10.1. I will formulate the answer properly and submitted. Thanks :)LetsPlayYahtzee
You might want to upgrade to 0.10.2 -- it offers inner KStream-KTable join and thus, there will be no null values when ValueJoiner get's called. Btw: you don't need to upgrade your brokers for this: docs.confluent.io/current/streams/…Matthias J. Sax
Oh really cool. I hadn't noticed that. yep this simplified things a lot. thanks!LetsPlayYahtzee

1 Answers

2
votes

The solution to my problem came from understanding the join semantics a bit better.

Like in database joins (although I am not saying that Kstream joins follow the db join concepts precisely) the left join operation results in rows with null values wherever the right side keys are missing.

So eventually the only thing I had to do was to decouple my valueJoiner from the subsequent calculations / operations (I needed to perform some calculations on fields of the joined records and return a newly constructed object) and have it only return an array of the joined values. Then I could filter out the records that resulted in null values by checking those arrays.

Based on Matthias's J. Sax suggestion, I used the 0.10.2 version instead of the 0.10.1 which is compatible with broker version 0.10.1 and replace the whole leftJoin logic with inner join which removes the need for filtering out null values.