I'm trying to use completionTimeout
in an akka streams flow. I've provided a contrived example where the flow takes 10 seconds but I've added a completionTimeout
with a timeout of 1 second. I would expect this flow to timeout after 1 second. However, in the example the flow completes in 10 seconds without any errors.
Why doesn't the flow timeout? Is there a better way to timeout a flow?
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
class Test extends FlatSpec with Matchers {
implicit val system = ActorSystem("test")
"This Test" should "fail but passes and I don't know why" in {
//This takes 10 seconds to complete
val flow: Flow[String, String, NotUsed] = Flow[String]
.map(str => {
println(s"Processing ${str}")
Thread.sleep(10000)
})
.map(_ => {"Done!"})
val future: Future[String] =
Source.single("Input")
.via(flow)
.completionTimeout(1 second) // Set a timeout of 1 second
.runWith(Sink.last)
val result = Await.result(future, 15 seconds)
result should be("Done!")
}
}