I recently started experimenting with kafka streams. I have a scenario where I need to join a KStream
with a KTable
. It may be the case that the KTable
does not contain some of the keys. In that case I get a NullPointerException
.
specifically I was getting
stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException
I don't know how I can handle that. I cannot somehow filter out the records of the stream that do not correspond to a table entry.
update
Looking a bit further I found that I can query the underlying store to find whether a key exists through the ReadOnlyKeyValueStore
interface.
In this case my question is, would that be the best way to go? i.e. Filtering the stream to be joined based on whether a key exists in the local store?
My second question in this case would be, since I care about leveraging the Global State Store
introduced in version 10.2
in a next phase, should I expect that I will be also able in the same manner to query the Global State Store
?
update
The previous update is not accurate since it's not possible to query the state store from inside the topology
final update
After understanding the join semantics a bit better I was able to solve the issue just be simplifying the valueJoiner
to only return the results, instead of performing actions on the joined values, and adding an extra filtering step after the join to filter out null values.
NullPointerException
exactly? What version of Kafka do you use? What "type" of join do you use (ie, inner or left join)? Also check out this: docs.confluent.io/current/streams/… – Matthias J. SaxleftJoin
on the stream. I updated the question with the message I was getting. I am on version0.10.1
. I will formulate the answer properly and submitted. Thanks :) – LetsPlayYahtzee0.10.2
-- it offers inner KStream-KTable join and thus, there will be nonull
values whenValueJoiner
get's called. Btw: you don't need to upgrade your brokers for this: docs.confluent.io/current/streams/… – Matthias J. Sax