1
votes

In the following code, tick emits a new object every three seconds. I'm trying to count the number of emitted objects every second using groupedWithin (which ignores empty groups). Is there any way in Akka Streams for the following code to print 0 in periods when tick does not emit any objects?

Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .map(List::size)
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);

In other words, I'd like the output of this code to be this sequence: 1 0 0 1 0 0 1 ... (every second) instead of 1 1 1 ... (every three seconds).

EDIT: This is the best workaround I have come up with so far (using keepAlive to send some special objects if the upstream is idle):

Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
    .keepAlive(Duration.ofSeconds(1), KeepAliveElement::new)
    .groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
    .map(lst -> lst.stream().filter(e -> !(e instanceof KeepAliveElement)).collect(Collectors.toList()))
    .map(List::size)
    .runWith(Sink.foreach(e -> System.out.println(e)), materializer);

Is there any better way to do this?

1

1 Answers

1
votes

I thought this would be of normal difficulty, I was wrong. One thing I wanted to do is to ensure that the flow counting items that pass through the stream does not keep a reference to each item it sees: if many items pass in the aggregation period, you will end up with an unnecessarily big list in memory (even if only for a second) and the performance penalty to add (many) items to it. The following solution, although complex, keeps only a counter.

NOTE: Although I tested the happy scenario, I cannot say this is battle-proven, so use with caution!

Based on Akka's GroupedWeightedWithin and the documentation here:

public class CountInPeriod<T> extends GraphStage<FlowShape<T, Integer>> {
    public Inlet<T> in = Inlet.<T>create("CountInPeriod.in");
    public Outlet<Integer> out = Outlet.<Integer>create("CountInPeriod.out");
    private FlowShape<T, Integer> shape = FlowShape.of(in, out);
    private Duration duration;

    public CountInPeriod(Duration duration) {
        this.duration = duration;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new TimerGraphStageLogic(shape) {
            private int counter = 0;
            private int bufferPushCounter = -1;

            {
                setHandler(in, new AbstractInHandler() {
                    @Override public void onPush() throws Exception, Exception {
                        grab(in);
                        counter++;
                        pull(in);
                    }
                });
                setHandler(out, new AbstractOutHandler() {
                    @Override public void onPull() throws Exception, Exception {
                        if (bufferPushCounter >= 0) {
                            push(out, bufferPushCounter);
                            bufferPushCounter = -1;
                        }
                    }
                });
            }

            @Override
            public void preStart() throws Exception, Exception {
                scheduleWithFixedDelay(CountInPeriod.class, duration, duration);
                pull(in);
            }

            @Override
            public void onTimer(Object timerKey) throws Exception, Exception {
                if (isAvailable(out)) emitCounter();
                else bufferPush();
            }

            private void emitCounter() {
                push(out, counter);
                counter = 0;
                bufferPushCounter = -1;
            }

            private void bufferPush() {
                bufferPushCounter = counter;
                counter = 0;
            }
        };
    }

    @Override
    public FlowShape<T, Integer> shape() {
        return shape;
    }
}

Test code:

public class GroupTicked {
    final static ActorSystem as = ActorSystem.create("as");

    public static void main(String... args) throws Exception {
        CompletionStage<Done> done = Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
                .take(7) // to finish in finite time...
                .via(new CountInPeriod<>(Duration.ofSeconds(1)))
                .runWith(Sink.foreach(e -> System.out.println(System.currentTimeMillis() + " -> " + e)), as);
        done.thenAccept(x -> as.terminate());
    }
}