1
votes

I'm trying to use completionTimeout in an akka streams flow. I've provided a contrived example where the flow takes 10 seconds but I've added a completionTimeout with a timeout of 1 second. I would expect this flow to timeout after 1 second. However, in the example the flow completes in 10 seconds without any errors.

Why doesn't the flow timeout? Is there a better way to timeout a flow?

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

class Test extends FlatSpec with Matchers {

  implicit val system = ActorSystem("test")

  "This Test" should "fail but passes and I don't know why" in {

    //This takes 10 seconds to complete
    val flow: Flow[String, String, NotUsed] = Flow[String]
        .map(str => {
          println(s"Processing ${str}")
          Thread.sleep(10000)
        })
        .map(_ => {"Done!"})

    val future: Future[String] =
      Source.single("Input")
        .via(flow)
        .completionTimeout(1 second) // Set a timeout of 1 second
        .runWith(Sink.last)

    val result = Await.result(future, 15 seconds)

    result should be("Done!")
  }
}
1

1 Answers

4
votes

In executing a given stream, Akka Stream leverages operator fusion to fuse stream operators by a single underlying actor for optimal performance. For your main thread to catch the timeout, you could introduce asynchrony by means of .async:

val future: Future[String] =
  Source.single("Input")
    .via(flow)
    .async  // <--- asynchronous boundary
    .completionTimeout(1 second)
    .runWith(Sink.last)

future.onComplete(println)
// Processing Input
// Failure(java.util.concurrent.TimeoutException: The stream has not been completed in 1 second.)

An alternative to introduce asynchrony is to use the mapAsync flow stage:

val flow: Flow[String, String, NotUsed] = Flow[String]
  .map(str => {
    println(s"Processing ${str}")
    Thread.sleep(10000)
  })
  .mapAsync(1)(_ => Future("Done!"))  // <--- asynchronous flow stage

Despite getting the same timeout error, you may notice it'll take ~10s to see result when using mapAsync, whereas only ~1s using async. That's because while mapAsync introduces an asynchronous flow stage, it's not an asynchronous boundary (like what async does) and is still subject to operator fusion.