0
votes

I have a stream delta of numbers in a kafka topic that needs to be aggregated in a special way, i.e.:

aggregate[0] = 0
aggregate[N] = aggregate[N-1] * (N - 1) / N + delta[N - 1] / N

(exact formula is irrelevant, note the dependency on the previous element in aggregate though)

Essentially, I need to subscribe to two kafka topics simultaneously, where I advance in both topics at the same time: when I read an item in the delta topic, I need to read the the corresponding item from the aggregate topic as well, and write the result into the the aggregate topic, before the next item in the delta topic is consumed.

Is this at all possible in kafka? Could ksql with a clever join help?

1
please note that there are no guarantees about the order of elements within in a topic. You can only have this on partition level. - TobiSH
@TobiSH I guess that's fine for me, as long it's not resulting in a race condition - stefan

1 Answers

0
votes

I wonder whether my pseudocode could helps. Assume that there are two topics, "delta" and "aggregate". And partition of both topic is 1 to simplify situation(so that we get global order)

# this is just pseudocode to show my thoughts
def demo():
    delta_consumer = Consumer("delta")
    aggregate_consumer = Consumer("aggregate")
    aggregate_producer = Producer("aggregate")

    is_pre_aggregate_result_exists = aggregate_consumer.get_offset() != 0 # check whether it's first running 
    for delta_data in delta_consumer.poll():
        if not is_pre_aggregate_result_exists:
            last_aggregate_result = 0
        else:
            last_aggregate_result = aggregate_consumer.get_last_record()
        new_aggregate_result = user_define_function(delta_data, last_aggregate_result)
        aggregate_producer.producer(new_aggregate_result)
        is_pre_aggregate_result_exists = True

Meanwhile, I guess that kafka+structurd-steaming could handle your problem because the internal need of your question is to get aggregate_result on a stream table and then output result to kafka topic, where kafka+structured-steraming is a perfect solution.