I am trying to join two Kafka topics. One a KStream and other a KTable. The left join complains that state store for the processor is not present. I did look at many samples of code in kafka GitHub and elsewhere where StateStore is not explicitly created by KStream client code. Please let know what is missing from below code.
The application stream is left joined with users table to emit records with app and user together. An application has an owner who is an user.
version: 1.1.0
Thanks
public void process() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Common.KAFKA_SOCKET);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.applicationSerde);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// User properties: userid, username
KTable<String, User> users = new StreamsBuilder().table(TOPIC_USERS,
Consumed.with(Serdes.String(), CustomSerdes.serdeFor(User.class)));
StreamsBuilder builder = new StreamsBuilder();
// Application properties: id, name
KStream<String, Application> stream = builder.stream(TOPIC_APPLICATIONS);
stream.
map((appId, app) -> KeyValue.pair(app.getOwnerId(), app.getAppId()))
.leftJoin(users, (app, user) -> "a:" + app + " u:" + user.getUserName())
.to(OUTPUT_TOPIC);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
StreamsManager.startAndHandleShutdown(streams);
}
Error:
Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore topic-users-STATE-STORE-0000000000 is not added yet.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:797)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:817)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:805)
at com.test.streams.users.AppWithUserConsumerMain.process(AppWithUserConsumerMain.java:50)