0
votes

I want to do a simple max across an entire dataset. I started with the Kafka example at: https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-maintenance/kafka/src/main/java/avro/KafkaAvroSource.java

I just changed the pipeline to:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
    .map(Map.Entry::getValue)
    .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
    .map(user -> (Integer) user.get(2))
    .drainTo(Sinks.list("result"));

and the go to:

IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);

to get the largest age of people in the topic. It however doesn't return 20 and seems to return different values on different runs. Any idea why?

1
You use Kafka, which is a streaming source, but you want the result for "the entire dataset". These two seem to be at odds with each other. a Kafka source never completes so you never get the final result, just the result so far.Marko Topolnik
You're using a list sink so that each update will append to the end of the list. The value at index 0 will not change. Instead you should use a map sink with a fixed key so that the newer results overwrites the previous.Can Gencer

1 Answers

2
votes

You seem to be using rollingAggregate, which produces a new output item every time it receives some input, but all you check is the first item it emitted. You must instead find the latest item it emitted. One way to achieve it is by pushing the result into an IMap sink, using the same key every time:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
 .withoutTimestamps()
 .map(Map.Entry::getValue)
 .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
 .map(user -> entry("user", (Integer) user.get(2)))
 .drainTo(Sinks.map("result"));

You can fetch the latest result with

IMap<String, Integer> result = jet.getMap("result");
System.out.println(result.get("user");