I thought this would be of normal difficulty, I was wrong. One thing I wanted to do is to ensure that the flow counting items that pass through the stream does not keep a reference to each item it sees: if many items pass in the aggregation period, you will end up with an unnecessarily big list in memory (even if only for a second) and the performance penalty to add (many) items to it. The following solution, although complex, keeps only a counter.
NOTE: Although I tested the happy scenario, I cannot say this is battle-proven, so use with caution!
Based on Akka's GroupedWeightedWithin
and the documentation here:
public class CountInPeriod<T> extends GraphStage<FlowShape<T, Integer>> {
public Inlet<T> in = Inlet.<T>create("CountInPeriod.in");
public Outlet<Integer> out = Outlet.<Integer>create("CountInPeriod.out");
private FlowShape<T, Integer> shape = FlowShape.of(in, out);
private Duration duration;
public CountInPeriod(Duration duration) {
this.duration = duration;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new TimerGraphStageLogic(shape) {
private int counter = 0;
private int bufferPushCounter = -1;
{
setHandler(in, new AbstractInHandler() {
@Override public void onPush() throws Exception, Exception {
grab(in);
counter++;
pull(in);
}
});
setHandler(out, new AbstractOutHandler() {
@Override public void onPull() throws Exception, Exception {
if (bufferPushCounter >= 0) {
push(out, bufferPushCounter);
bufferPushCounter = -1;
}
}
});
}
@Override
public void preStart() throws Exception, Exception {
scheduleWithFixedDelay(CountInPeriod.class, duration, duration);
pull(in);
}
@Override
public void onTimer(Object timerKey) throws Exception, Exception {
if (isAvailable(out)) emitCounter();
else bufferPush();
}
private void emitCounter() {
push(out, counter);
counter = 0;
bufferPushCounter = -1;
}
private void bufferPush() {
bufferPushCounter = counter;
counter = 0;
}
};
}
@Override
public FlowShape<T, Integer> shape() {
return shape;
}
}
Test code:
public class GroupTicked {
final static ActorSystem as = ActorSystem.create("as");
public static void main(String... args) throws Exception {
CompletionStage<Done> done = Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
.take(7) // to finish in finite time...
.via(new CountInPeriod<>(Duration.ofSeconds(1)))
.runWith(Sink.foreach(e -> System.out.println(System.currentTimeMillis() + " -> " + e)), as);
done.thenAccept(x -> as.terminate());
}
}