1
votes

I have to use Kafka Stream to get the transaction info to draw Candlestick chart in each specific durations from the transaction result topic, it has transaction id, amount, price, deal time, the key is transaction id, which is totally different for each record, what I want to do is do calculation base on the transaction result to get the the highest price, lowest price, open price, close price, tx close_time for each duration and use it to create a Candlestick chart. I have used the kafka stream window to do this:

final KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);
KTable<Windowed<String>, InfoRecord> kTableRecords= groupedStream.aggregate(
 InfoRecord::new, /* initializer */
 (k, v, aggregate) -> aggregate.add(k,v), /* adder */
 TimeWindows.of(TimeUnit.SECONDS.toMillis(5)).until(TimeUnit.SECONDS.toMillis(5)),
 infoRecordSerde);

As in the source topic, each record has the txId as the key, and the txId is never duplicated, so, when do aggregation, the result K-table will have the same record as K-stream but I could use the window to get all the records in the specific durations.

I think the kTableRecords should contain all the records in a specific duration, i.e. the 5 seconds, So, I could loop over all the records in the 5 seconds, to get the high, low, open(the very first record price in the window), close(the very last record price in the window), close_time (tx time for the very last record in the window), so that I would only get one record for this window and output this result to a sink kafka topic, but I don't know how to do this in these window durations.

I think the code will be like:

kTableRecords.foreach((key, value) -> {

// TODO: Add Logic Here

})

the IDE show this foreach has been deprecated,

But I don't know how to distinct the record in this window or in next window or I need a window record retain time use until in the sample code above.

I have struggle in this for several days, and I still don't know the correct way to complete my jobs, appreciate anyone's help for make me in the right way, thanks

kafka version is: 0.11.0.0

Update:

With the hints from Michal in his post, I changed my code, and do the high, low, open, close price calculation in the aggregator instance, but the results makes me reallize for each different key in the spcific window, the logic create a new instance for the key, and do the add excutaions for the current key only, not interact with values of other keys, what i really want is to caluate the high, low, open, close price for each record with different key in that window duration, so what i need is not create a new instance for each key, it shoule be create only one aggregate instance for each specific window and do the calculation for all the record values in the durations, each duration window get one set of (high, low, open, close price). I have read the topic : How to compute windowed aggregations over successively increasing timed windows? So, i am doubt, i am not sure, if this is the right solution for me, thanks.

By the way, K-line means Candlestick chart.


UPDATE II:

Based on your updates, i create the code as bellow shows:

KStream<String, JsonNode> transactionKStream = builder.stream(keySerde, valueSerde, srcTopicName);

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy((k,v)-> "constkey", keySerde, valueSerde);

KTable<Windowed<String>, MarketInfoRecord> kTable =
        groupedStream.aggregate(
        MarketInfoRecord::new, /* initializer */
        (k, v, aggregate) -> aggregate.add(k,v), /* adder */
        TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100)),
        infoRecordSerde, "test-state-store");

KStream<String, MarketInfoRecord> newS = kTable.toStream().map(
        (k,v) -> {
            System.out.println("key: "+k+",  value:"+v);
            return KeyValue.pair(k.window().start() + "_" + k.window().end(), v);

        }

);

newS.to(Serdes.String(),infoRecordSerde, "OUTPUT_NEW_RESULT");

If i use a static string as the key when doing group, it's sure that when doing windowed aggregation, only one aggregator instance has been created for the window, and we could get the (high, low, open, close) for all the record in that window, but as the key a same for all the record, this window will gets updated for several times, and produce several record for one window,as:

key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=11, low=11, openTime=1521304432205, closeTime=1521304432205, open=11, close=11, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=44, openTime=1521304622655, closeTime=1521304622655, open=44, close=44, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=44, low=33, openTime=1521304604182, closeTime=1521304622655, open=33, close=44, count=2}
key: [constkey@1521304400000/1521304500000],  value:MarketInfoRecord{high=22, low=22, openTime=1521304440887, closeTime=1521304440887, open=22, close=22, count=1}
key: [constkey@1521304600000/1521304700000],  value:MarketInfoRecord{high=55, low=55, openTime=1521304629943, closeTime=1521304629943, open=55, close=55, count=1}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=77, openTime=1521304827181, closeTime=1521304827181, open=77, close=77, count=1}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=77, low=66, openTime=1521304817079, closeTime=1521304827181, open=66, close=77, count=2}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=88, low=66, openTime=1521304817079, closeTime=1521304839047, open=66, close=88, count=3}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=99, low=66, openTime=1521304817079, closeTime=1521304848350, open=66, close=99, count=4}
key: [constkey@1521304800000/1521304900000],  value:MarketInfoRecord{high=100.0, low=66, openTime=1521304817079, closeTime=1521304862006, open=66, close=100.0, count=5}

so we need do dedupe as your posted link described in "38945277/7897191", right?

So, I want to know if i could do something like:

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupByKey();
// as key was unique txId, so this group is just for doing next window operation, the record number is not changed.

KTable<Windowed<String>, MarketInfoRecord> kTable =
   groupedStream.SOME_METHOD(
// just use some method to deliver the records in different windows,
// no sure if this is possible?
TimeWindows.of(TimeUnit.SECONDS.toMillis(100)).until(TimeUnit.SECONDS.toMillis(100))
// use until here to let the record purged if out of the window, 
// please correct me if i am wrong?

we could transform the time based series of input record turn to several windowed groups, each group have the window (or use window start time, end time combined as string key), so, for each group, the key is different, but it has several record which has different values, then we do aggregation(no need use windowed aggregation here), the values has been calculated, and from each key:value pair, i.e. , we could get one result record, and next window has different windowBased Key name, so in this way, the execution downstream shoud have multiple threads( as the key changes)

1
This is something that I would like to do myself as wellArefe

1 Answers

2
votes

I suggest you do all the calculations you mention not in a foreach but directly in your aggregator, that is, in the adder:

(k, v, aggregate) -> aggregate.add(k,v), /* adder */

the add method can do all the things you mentioned (I suggest you first map the JsonNode to a Java object, let's call it Transaction), consider this pseudo-code:

private int low = Integer.MAX; // whatever type you use to represent prices
private int high = Integer.MIN;
private long openTime = Long.MAX; // whatever type you use to represent time
private long closeTime = Long.MIN;
...
public InfoRecord add(String key, Transaction tx) {
  if(tx.getPrice() > this.high) this.high = tx.getPrice();
  if(tx.getPrice() < this.low) this.low = tx.getPrice();
  if(tx.getTime() < this.openTime) {
    this.openTime = tx.getTime();
    this.open = tx.getPrice();
  }
  if(tx.getTime() > this.closeTime) {
    this.closeTime = tx.getTime();
    this.close = tx.getPrice();
  }
  return this;
}

Keep in mind that you may in reality get more than one record on output for each window as the windows can be updated multiple times (they're never final) as is explained in more detail here: https://stackoverflow.com/a/38945277/7897191

I don't know what a K-line is but if you want multiple windows of increasing duration, the pattern is outlined here

UPDATE: To aggregate all records in a window, just change the key to some static value before doing the aggregation. So to create your grouped stream you can use groupBy(KeyValueMapper), something like:

KGroupedStream<String, JsonNode> groupedStream = transactionKStream.groupBy( (k, v) -> ""); // give all records the same key (empty string)

Please be aware that this will cause repartitioning (since partition is determined by the key and we're changing the key) and the execution downstream will become single threaded (since there will now be just one partition).