16
votes

I've been working up my answer to Is there a standard Scala function for running a block with a timeout?, and have run into a problem if an exception is thrown in a Future.

  def runWithTimeout[T](timeoutMs: Long)(f: => T) : Option[T] = {
    awaitAll(timeoutMs, future(f)).head.asInstanceOf[Option[T]]
  }

So that

runWithTimeout(50) { "result" } should equal (Some("result"))
runWithTimeout(50) { Thread.sleep(100); "result" } should equal (None)

But if I throw an exception in my block it doesn't leak, but is swallowed - so that the following fails with "..no exception was thrown"

intercept[Exception] {
    runWithTimeout(50) { throw new Exception("deliberate") }
}.getMessage should equal("deliberate")

Syserr has a stack trace with the message

<function0>: caught java.lang.Exception: deliberate

but I can't find where in the Scala runtime that is printed.

Apart from wrapping f in another block which catches exceptions and propagates them if thrown, is there any way to persuade awaitAll and/or Future to throw?

6
It is probably printed because it got passed to the thread's UncaughtExceptionHandler. You could set your own handler, but that would still not allow you to throw the exception in a different thread.Kim Stebel
Have a look at Fingales futures (github.com/twitter/finagle), search for "Timeout" and Akka akka.io/docs/akka/1.1.2/scala/futures.htmloluies

6 Answers

15
votes

Short answer: no.

Exceptions don't do what you want when you're working in a threaded context, because you want to know about the exception in the caller, and the exception happens in the future's thread.

Instead, if you want to know what the exception was, you should return an Either[Exception,WhatYouWant]--of course, you have to catch that exception within the future and package it up.

scala> scala.actors.Futures.future{
  try { Right("fail".toInt) } catch { case e: Exception => Left(e) }
}
res0: scala.actors.Future[Product with Serializable with Either[Exception,Int]] = <function0>

scala> res0()   // Apply the future
res1: Product with Serializable with Either[Exception,Int] =
      Left(java.lang.NumberFormatException: For input string: "fail")
10
votes

Disclaimer: I work for Typesafe

Or.... you could use Akka and it would give you what you want without you having to go through hoops for it.

val f: Future[Int] = actor !!! message

Then

    f.get 

Will throw the exception that happened in the actor

    f.await.exception 

will give you an Option[Throwable]

2
votes

Working my way through @Rex Kerr's suggestion, I've created

object Timeout {

  val timeoutException = new TimeoutException

  def runWithTimeout[T](timeoutMs: Long)(f: => T) : Either[Throwable, T] = {
    runWithTimeoutIgnoreExceptions(timeoutMs)(exceptionOrResult(f)) match {
      case Some(x) => x
      case None => Left(timeoutException)
    }
  }

  def runWithTimeout[T](timeoutMs: Long, default: T)(f: => T) : Either[Throwable, T] = {
    val defaultAsEither: Either[Throwable, T] = Right(default)
    runWithTimeoutIgnoreExceptions(timeoutMs, defaultAsEither)(exceptionOrResult(f))
  }

  def runWithTimeoutIgnoreExceptions[T](timeoutMs: Long)(f: => T) : Option[T] = {
    awaitAll(timeoutMs, future(f)).head.asInstanceOf[Option[T]]
  }

  def runWithTimeoutIgnoreExceptions[T](timeoutMs: Long, default: T)(f: => T) : T = {
    runWithTimeoutIgnoreExceptions(timeoutMs)(f).getOrElse(default)
  }

  private def exceptionOrResult[T](f: => T): Either[Throwable, T] = 
    try { 
      Right(f) 
    } catch { 
      case x => Left(x)
    }
}

so that

  @Test def test_exception {
    runWithTimeout(50) { "result" }.right.get should be ("result")
    runWithTimeout(50) { throw new Exception("deliberate") }.left.get.getMessage should be ("deliberate")
    runWithTimeout(50) { Thread.sleep(100); "result" }.left.get should be (Timeout.timeoutException)

    runWithTimeout(50, "no result") { "result" }.right.get should be ("result")
    runWithTimeout(50, "no result") { throw new Exception("deliberate") }.left.get.getMessage should be ("deliberate")
    runWithTimeout(50, "no result") { Thread.sleep(100); "result" }.right.get should be ("no result")

}

Again, I'm a bit of a Scala novice, so would welcome feedback.

2
votes

scala.concurrent.ops.future includes exception handling.

So, instead of importing scala.actors.Futures.future, import scala.concurrent.ops.future instead.

That simple change in which import is there will cause the caller's call to .get to rethrow the exception. It works great!

0
votes

Or use Future.liftTryTry, turns it from Future[Object] to Future[Try[Object]], and you can match on the Try[Object] and check for an exception case Throw(e) and log / exit gracefully

-1
votes

You need to override the method exceptionHandler in order to catch exceptions. So your option is to define your own future method so it creates a MyFutureActor with exceptionHandler.

EDIT: FutureActor is private, so subclassing it isn't possible.

Another option is to use links to know when exceptions happened.

However, I think Rex Kerr's approach is better - just wrap the function in something that will catch the Exception. Too bad future doesn't already do that.