0
votes

I'd like to use the Kafka Streams API to perform a left join on a KStream with a KTable to add some fields of the table to the stream.

Everything works fine with a smaller version of the table (around 1,300 entries) containing all relevant entries.

Once I use the whole table (around 200,000 entries) I get a NullPointerException in the line where I get the relevant field of the Avro message (GenericRecord) of the KTable.

When I perform the same left join operation in KSQL, the added fields from the table are NULL. The relevant join keys exist in the table, but querying them in KSQL takes approx. 20 seconds until they show up.

Is it possible that the table is too large to perform a left join? If that's the case, is there anything I can do to make the join work using the whole table?

For development purposes I'm currently using Confluent Platform with a single Kafka broker and all topics have just a single partition.

Stack Trace:

[myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [myclass-68507371-7b8e-4bdc-8715-73d0307c9058-StreamThread-1] Failed to process stream task 3_0 due to the following error:
java.lang.NullPointerException
  at MyClass.lambda$main$6(MyClass.java:184)
  at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:73)
  at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
  at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
  at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
  at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
  at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
  at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)

The field is nullable in the Avro schema.

1
Can you share the stack trace? Because RocksDB is used to store the table state, and hence, data is spilled to disk, the size of the table should not be the problem.Matthias J. Sax

1 Answers

0
votes

As the stack trace indicates (the error originate from MyClass.lambda$main$6(MyClass.java:184)), the exception from your own code, that must be your ValueJoiner implementation. As you do a leftJoin(), the passed in "KTable" value may be null. I assume your code does not handle null correctly. If you want to avoid getting null at all and drop a KStream records if there is no corresponding KTable record instead, you could use join() instead of leftJoin().