1
votes

The following Scala snippet doesn't seem to return:

val queue = 
  Source.queue[Unit](10, OverflowStrategy.fail)
    .throttle(1, 1 second, 1, ThrottleMode.shaping)
    .to(Sink.ignore)
    .run()

Await.result(
  (1 to 15).map(_ => queue.offer(())).last,
  Duration.Inf)

Is this a bug in Akka streams or am I doing something wrong?

EDIT: just to circle back, this bug was opened and accepted in Akka: https://github.com/akka/akka/issues/23078

1

1 Answers

1
votes

This program gives more insight into what happens here:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.Await
import scala.concurrent.duration._

object Test extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import actorSystem.dispatcher

  val (queue, finalFuture) =
    Source.queue[Unit](10, OverflowStrategy.fail)
      .map(_ => println("Before throttle"))
      .throttle(1, 1.second, 1, ThrottleMode.shaping)
      .map(_ => println("After throttle"))
      .toMat(Sink.ignore)(Keep.both)
      .run()

  finalFuture.onComplete(r => println(s"Materialized future from ignore completed: $r"))

  Await.result((1 to 25).map(_ => queue.offer(()).map(e => println(s"Offer result: $e"))).last, Duration.Inf)
}

It prints the following for me:

Offer result: Enqueued
After throttle
Before throttle
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)

BUT sometimes it finishes with an exception:

Before throttle
After throttle
Before throttle
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Exception in thread "main" java.lang.IllegalStateException: Stream is terminated. SourceQueue is detached
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:57)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:56)
    at akka.stream.stage.CallbackWrapper$$anonfun$invoke$1.apply$mcV$sp(GraphStage.scala:1373)
    at akka.stream.stage.CallbackWrapper$class.akka$stream$stage$CallbackWrapper$$locked(GraphStage.scala:1379)
    at akka.stream.stage.CallbackWrapper$class.invoke(GraphStage.scala:1369)
    at akka.stream.impl.QueueSource$$anon$1.invoke(Sources.scala:47)
    at akka.stream.impl.QueueSource$$anon$2.offer(Sources.scala:180)
    at test.Test$$anonfun$4.apply(Test.scala:25)
    at test.Test$$anonfun$4.apply(Test.scala:25)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at test.Test$.delayedEndpoint$test$Test$1(Test.scala:25)
    at test.Test$delayedInit$body.apply(Test.scala:10)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

That is, you're seeing concurrency in action - the futures which you submit are executed in parallel, and one of them finishes with a failure, but much more often they just hang. If you get them in such an order that the failed future comes first, you get an exception, otherwise you get an infinite await.

To determine that your stream has actually terminated, you have to look into it directly, like it is done above. But most importantly, you should better push no more than the configured number of events into the queue, or if you do want to do that and you use OverflowStrategy.backpressure, you always need to wait for the last future that you submit to complete before executing the next offer().