I'm trying out akka streams, but i can't get backpressure to work in my simple example. I am admittely not experienced with akka (streams), so probably I'm missing something big.
I am producing (offering on a queue) integers faster than consuming them, so I thought that backpressure would kick in. My goal is to always consume the most recent item that was put in the queue (that's why I have bufferSize = 1 and OverflowStrategy.dropHead() on the source queue).
public class SimpleStream {
public static void main(String[] argv) throws InterruptedException {
final ActorSystem system = ActorSystem.create("akka-streams");
final Materializer materializer = ActorMaterializer.create(system);
final Procedure<Integer> slowConsumer = (i) -> {
System.out.println("consuming [" + i + "]");
ThreadUtils.sleepQuietly(1000, TimeUnit.MILLISECONDS);
};
final SourceQueue<Integer> q = Sink
.<Integer>foreach(slowConsumer)
.runWith(Source.<Integer>queue(1, OverflowStrategy.dropHead()), materializer);
final AtomicInteger i = new AtomicInteger(0);
final Thread t = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
int n = i.incrementAndGet();
q.offer(n);
System.out.println("produced: [" + n + "]");
ThreadUtils.sleepQuietly(500, TimeUnit.MILLISECONDS);
}
});
t.setName("ticking");
t.start();
// run some time... to observe the effects.
ThreadUtils.sleepQuietly(1, TimeUnit.HOURS);
t.interrupt();
t.join();
// eventually shutdown akka here...
}
}
However this is the result:
produced: [1]
consuming [1]
produced: [2]
produced: [3]
consuming [2] <-- Expected to be consuming 3 here.
produced: [4]
produced: [5]
consuming [3] <-- Expected to be consuming 5 here.
produced: [6]
produced: [7]
Please ignore the threading stuff here and there just to fake getting data from an external source (like it would happen if I had to use this in a real project).
Any idea of what I'm missing?
Source.queue
. You can call itsoffer
as many times as possible. You need to check whatoffer
returns. You most likely want producer to be independent from consumer queue. Take a look atMergeHub
. Perhaps it will work better for you. – expert