0
votes

I'm attempting to join a KStream with a KTable. Without the join I have no problem reading from the intermediate topic "book-attribute-by-id".

Sample msg for the KTable:

{key: {id: 1}
 value: {id: 1, attribute_name: "weight"}}

Sample msg for the KStream:

{key: {id: 1},
 value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}

Desired output to 'final aggregation' topic:

{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}

Here is the code

    KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
    KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));

    bookAttributeStream
        .selectKey((k, v) -> k.getId())
        .to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));

    KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));

    // when the snippet below is commented out, consuming "book-attribute-by-id" works. 
    bookValueIntStream
        .selectKey((k, v) -> v.getAttribute_id())
        .join(bookAttributeByIdTable, (intValue, attribute) -> {
                System.out.println("intValue: " + intValue);
                System.out.println("attribute: " + attribute);
                return new BookAttributeValue(intValue, attribute);
            });

Exception when joining KStream & KTable:

Exception in thread "xxx-StreamThread-1" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: stream-thread [xxx-StreamThread-1]Topic not found: book-attribute-by-id at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:792)

1

1 Answers

2
votes

I assume, you are using kafka-streams 1.0.0

The issue is that you have to create input topics for your streams.

In your case topics are: book-attribute-by-id and those that are value of variables: bookAttributeTopic, bookValueIntTopic.

For joins Kafka Streams has to insure, that the number of partitions in joining topics are equal. Exception is thrown, when its try to get metadata for topic: book-attribute-by-id.

Before running your application you have to manual create book-attribute-by-id topic

In newer version of kafka-streams existence of topics is check before validation of number of partitions.