2
votes

I have a few problems with creating a KTable with a timewindow in Kafka.

I want to create a table that counts the number of ID's in the stream like this.

ID (String) |  Count (Long)
    X       |       5
    Y       |       6
    Z       |       7

and so forth. I want to able to get the table using the Kafka REST-API, preferably as .json.

Heres my code at the moment:

    StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> streams = builder.stream(srcTopic);

    KTable<Windowed<String>, Long> numCount = streams
            .flatMapValues(value -> getID(value))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs))
            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo"));

The problem I'm facing right now is that the table isn't created as a <String, Long> but as <String, String> instead. Which means that I can't get the correct count number, but instead I'm receiving the correct key but with corrupted counts. I've tried to force it out as a Long using Long.valueOf(value) without success. I don't know how to proceed from here. Do I need to write the KTable to a new topic? Since I want the table to be queryable using the kafka REST-API I don't think it's needed, am I right? The Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("foo") should make it queryable as "foo", right?

The KTable creates a changelog-topic, is this enough in order to make it queryable? Or do I have to create a new topic for it to write to?

I'm using another KStream to verify the output now.

KStream<String, String> streamOut = builder.stream(srcTopic);

streamOut.foreach((key, value) -> System.out.println(key + " => " + value));

and it outputs:

 ID    COUNT
2855 => ~
2857 => �
2859 => �
2861 => V(
2863 => �
2874 => �
2877 => J
2880 => �2
2891 => �=

Either way, I don't really want to use a KStream to collect the output, I want to query the KTable. But as mentioned, I don't really understand how the query works..

Update

Managed to get it to work with

    ReadOnlyWindowStore<String, Long> windowStore =
            kafkaStreams.store("tst", QueryableStoreTypes.windowStore());
        long timeFrom = 0;
        long timeTo = System.currentTimeMillis(); // now (in processing-time)
        WindowStoreIterator<Long> iterator = windowStore.fetch("x", timeFrom, timeTo);
        while (iterator.hasNext()) {
          KeyValue<Long, Long> next = iterator.next();
          long windowTimestamp = next.key;
          System.out.println(windowTimestamp + ":" + next.value);
        }

Many thanks in advance,

1

1 Answers

4
votes

The output type of KTable is <Windowed<String>,String> because in Kafka Streams multiple windows are maintained in parallel to allow handling out-of-order data. Thus, it's not the case, that there is a single window instance, but many window instances in parallel. (cf. https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#hopping-time-windows)

Keeping "older" windows allow to update them when data arrives late. Note, Kafka Streams semantics is based on event-time.

You can still query the KTable -- you only need to know what window you want to query.

Update

The JavaDoc describe how to query the table: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java#L94-L101

KafkaStreams streams = ... // counting words
Store queryableStoreName = ... // the queryableStoreName should be the name of the store as defined by the Materialized instance
ReadOnlyWindowStore<String,Long> localWindowStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>windowStore());

String key = "some-word";
long fromTime = ...;
long toTime = ...;
WindowStoreIterator<Long> countForWordsForWindows = localWindowStore.fetch(key, timeFrom, timeTo); // key must be local (application state is shared over all running Kafka Streams instances)