0
votes

I have input data that I am fetching using Kafka Stream. What I need to implement is just a tumbling window of 5 seconds and output the data to a Kafka topic. However, I am not able to accomplish this using lambda. Could someone help?

Below is what I have written, but I am getting errors:

    KTable<TimeWindowedKStream<String, String> , String> result = source.
            groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(5000)));

    result.to(Serdes.String(), Serdes.Long(), "outputtopic");

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);

However, for the result variable eclipse is giving me an error:"Type mismatch: cannot convert from TimeWindowedKStream to KTable,String>".

Also while writing the value of result to another topic eclipse is giving me error:"The method to(Serde>, Serde, String) in the type KTable,String> is not applicable for the arguments (Serde, Serde, String)".

As far as I understand windowing cannot be achieved without aggregation of some sort. However, I just want to output the data for every 5-second windows to another output topic.

1
"However, I just want to output the data for every 5-second windows to another output topic." -- It's not clear to me what you try to achieve? If you window the input stream, and write all data again into an output topic, why do you want to window it in the first place? Or do you want to write one output message per window? For this case, you need to .aggregate() all input record of a window into one output message. You might also want to use suppress() operator for this case, otherwise, the result might not be what you expect.Matthias J. Sax

1 Answers

1
votes

"Type mismatch: cannot convert from TimeWindowedKStream to KTable,String>".

You have to call some aggregation function on TimeWindowedKStream to get a table, ex. count(), aggregate(...)

"The method to(Serde>, Serde, String) in the type KTable,String> is not applicable for the arguments (Serde, Serde, String)"

You can't write to the topic using KTable you have at first call KTable::toStream(). KStream that is return has to(...) function.