1
votes

The current Hazelcast Jet 0.6.1 code sample demonstrates aggregation based on single field (e.g. ticker).

Here is a reference.

\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java

How this can be extended for more than one like ticker, traderId etc.

Here is current sample code from the StockExchange.java

 private static Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
            alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
     .addTimestamps(Trade::getTime, 3000)
     .groupingKey(Trade::getTicker)
     .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
     .aggregate(counting(),
             (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
     .drainTo(Sinks.logger());

    return p;
}
2

2 Answers

1
votes

For ticker & traderId you can use:

.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))

Generally, the key can by anything that implements equals and hashCode properly. Tuple2 is a generic container for two values.

0
votes

We can also provide the comma separated keys for groupping.

.aggregate(AggregateOperations.groupingBy(data -> {
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append(StringUtils.defaultString(data.getSource1().get(dataValue) + "", "")).append(",");
    return stringBuilder.substring(0, stringBuilder.toString().length() - 1); 
}));