1
votes

Using kafka-streams, I want to group a stream S of elements E by some key K1 while aggregating all values for that same key into a joined result AGG. This results in a KTable T1.

Depending on the aggregated result, the value should be repartitioned into another KTable T2, grouped by a key K2 taken from the aggregated result AGG. So the aggregated result should generate the key for the next regroup.

In the end I'm only interested in a KTable T2 where the key is K2 and the value is AGG

However, this does not work. I only get a KTable T for the last value. Not a value for each key K2

I know the results of aggregation are only forwarded after some time, so I already tried to lower commit.interval.ms to 1 but to no avail.

I also tried to use through and write the intermediate results to a stream but that didn`t succeed as well.

val finalTable = streamBuilder.kstream("streamS")
                    .groupBy{ k, v -> createKey1(k, v) }
                    .aggregate(
                            { Agg.empty() },
                            { k, v, previousAgg ->
                                Agg.merge(previousAgg, v)
                            })
                    .toStream()
//                    .through("table1")
                    .groupBy { k1, agg -> agg.createKey2()}
                    .reduce{ _, agg -> agg }

For a stream S containing the following values:
key1="123", id="1", startNewGroup="false"
key1="234", id="2", startNewGroup="false"
key1="123", id="3", startNewGroup="false"
key1="123", id="4", startNewGroup="true"
key1="234", id="5", startNewGroup="false"
key1="123", id="6", startNewGroup="false"
key1="123", id="7", startNewGroup="false"
key1="123", id="8", startNewGroup="true"

I would like the end result to be a KTable having the following latest key-values:
key: 123-1, value: 'key1="123", key2="123-1", ids="1,3"'
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-4, value: 'key1="123", key2="123-4", ids="4,6,7"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'

The original stream S of elements is first grouped by key1 where the aggregated result contains the groupby key key1 and adds an extra field key2 containing a combination of key1 with the id of the first occurrence.
Whenever the aggregation received a value with startNewGroup set to true, it returns an aggregation with the key2 field set to the key1 and the id of the new value, effectively creating a new subgroup.
In the second regroup, we simple group by the key2 field.

However what we really observe is the following:
key: 234-2, value: 'key1="234", key2="234-2", ids="2,5"'
key: 123-8, value: 'key1="123", key2="123-8", ids="8"'

1
Did you try stream().groupBy().aggregate().groupBy().aggregate() -- using a custom transform() does not seem to be the simples solution. Note, that converting the first result into stream() changes the semantics from "update" to "facts" - Matthias J. Sax
Actually, I did and it worked too, but you need to make sure the materializer in the first aggregate is configured not to cache the results when you want the results to be immediately available downstream. I think otherwise, it will wait and buffer the results. - Jan Bols
It depends on your requirements, but if that is what you need, you can disable caching for the first aggregation via Materialized.as(null).withCachingDisabled(). -- What I don't understand is, why you say "However, this does not work. I only get a KTable T for the last value. Not a value for each key K2" ? And why does reducing the commit interval not give you "quicker updates" downstream is also unclear to me. - Matthias J. Sax
Also, why does a custom transformer help (ie, the answer you accepted)? If you feed the result from the first aggregation into the transformer, you would have the same behavior that upstream caching does not forward each update to the transformer? You would need to disable caching upstream, again. - Matthias J. Sax
Btw: you can also disable caching globally by setting cache.max.bytes.buffering to zero in StreamsConfig. - Matthias J. Sax

1 Answers

1
votes

For your use case is better to use Processor API. Processor API can be easily combine with Kafka Streams DSL (Processor API integration).

You have to implement Custom Transformer, that will aggregate your messages for particular key in state store. When startNewGroup=true message arrive old messages for the key will be forward to downstream and new aggregation will start

You Sample Transformer might look as follow:

import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore

case class CustomTransformer(storeName: String) extends Transformer[String, Value, Agg] {

  private var stateStore: KeyValueStore[String, Agg] = null
  private var context: ProcessorContext = null

  override def init(context: ProcessorContext): Unit = {
    this.context = context
    stateStore = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, Agg]]
  }

  override def transform(key: String, value: Value): Agg = {
    val maybeAgg = Option(stateStore.get(key))

    if (value.startNewGroup) {
      maybeAgg.foreach(context.forward(key, _))
      stateStore.put(key, Agg(value))
    }
    else
      stateStore.put(key, maybeAgg.map(_.merge(value)).getOrElse(Agg(value)))
    null
  }

  override def close(): Unit = {}
}