This is, in part, a follow-up to Aggregation over a specific partition in Apache Kafka Streams
Let's suppose I have a topic named "events" with 3 partitions on which I send string -> integer data like so:
(Bob, 3) on partition 1
(Sally, 4) on partition 2
(Bob, 2) on partition 3
...
I would like to aggregate the values (in this example, just a simple sum) across all partitions to end up with a KTable that looks something like:
(Sally, 4)
(Bob, 5)
As mentioned in the answer to the question I linked to above, it's not possible to directly do this kind of cross-partition aggregation. However, the answerer mentioned that it was possible if the messages have the same keys (which is true in this case). How might this be accomplished?
I would also like to be able to query these aggregate values from a "global" state store that is replicated across each instance of the Kafka Streams application.
My first thought was to use a GlobalKTable (which I believe, according to this page, should be what I need). However, the changelog topic for this state store has the same number of partitions as the original "events" topic, and simply does the aggregation on a per-partition basis rather than across all partitions.
This is a slimmed down version of my application - not really sure where to go from here:
final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
.groupByKey()
.aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);
aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();
builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));