1
votes

I am trying to join two Kafka topics. One a KStream and other a KTable. The left join complains that state store for the processor is not present. I did look at many samples of code in kafka GitHub and elsewhere where StateStore is not explicitly created by KStream client code. Please let know what is missing from below code.

The application stream is left joined with users table to emit records with app and user together. An application has an owner who is an user.

version: 1.1.0

Thanks

  public void process() {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
    config.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Common.KAFKA_SOCKET);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.applicationSerde);
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
    config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

    // User properties: userid, username
    KTable<String, User> users = new StreamsBuilder().table(TOPIC_USERS,
        Consumed.with(Serdes.String(), CustomSerdes.serdeFor(User.class)));

    StreamsBuilder builder = new StreamsBuilder();
    // Application properties: id, name
    KStream<String, Application> stream = builder.stream(TOPIC_APPLICATIONS);

    stream.
        map((appId, app) -> KeyValue.pair(app.getOwnerId(), app.getAppId()))
        .leftJoin(users, (app, user) -> "a:" + app + " u:" + user.getUserName())
        .to(OUTPUT_TOPIC);

    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    StreamsManager.startAndHandleShutdown(streams);
  }

Error:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore topic-users-STATE-STORE-0000000000 is not added yet.
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
    at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:797)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:817)
    at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:805)
    at com.test.streams.users.AppWithUserConsumerMain.process(AppWithUserConsumerMain.java:50)
1

1 Answers

2
votes

to be able to use join, both parts of joining (in your case KStream and KTable) should be created from the same StreamsBuilder, so they will belong to the same topology.

in your case you created two StreamsBuilder, and as a result, KStream and KTable don't belong to the same topology.