Introduction
Scala's Future
(new in 2.10 and now 2.9.3) is an applicative functor, which means that if we have a traversable type F
, we can take an F[A]
and a function A => Future[B]
and turn them into a Future[F[B]]
.
This operation is available in the standard library as Future.traverse
. Scalaz 7 also provides a more general traverse
that we can use here if we import the applicative functor instance for Future
from the scalaz-contrib
library.
These two traverse
methods behave differently in the case of streams. The standard library traversal consumes the stream before returning, while Scalaz's returns the future immediately:
import scala.concurrent._
import ExecutionContext.Implicits.global
// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))
// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))
There's also another difference, as Leif Warner observes here. The standard library's traverse
starts all of the asynchronous operations immediately, while Scalaz's starts the first, waits for it to complete, starts the second, waits for it, and so on.
Different behavior for streams
It's pretty easy to show this second difference by writing a function that will sleep for a few seconds for the first value in the stream:
def howLong(i: Int) = if (i == 1) 10000 else 0
import scalaz._, Scalaz._
import scalaz.contrib.std._
def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
printf("Starting %d!\n", i)
Thread.sleep(howLong(i))
printf("Done %d!\n", i)
i
}
Now Future.traverse(Stream(1, 2))(toFuture)
will print the following:
Starting 1!
Starting 2!
Done 2!
Done 1!
And the Scalaz version (Stream(1, 2).traverse(toFuture)
):
Starting 1!
Done 1!
Starting 2!
Done 2!
Which probably isn't what we want here.
And for lists?
Strangely enough the two traversals behave the same in this respect on lists—Scalaz's doesn't wait for one future to complete before starting the next.
Another future
Scalaz also includes its own concurrent
package with its own implementation of futures. We can use the same kind of setup as above:
import scalaz.concurrent.{ Future => FutureZ, _ }
def toFutureZ(i: Int) = FutureZ {
printf("Starting %d!\n", i)
Thread.sleep(howLong(i))
printf("Done %d!\n", i)
i
}
And then we get the behavior of Scalaz on streams for lists as well as streams:
Starting 1!
Done 1!
Starting 2!
Done 2!
Perhaps less surprisingly, traversing an infinite stream still returns immediately.
Question
At this point we really need a table to summarize, but a list will have to do:
- Streams with standard library traversal: consume before returning; don't wait for each future.
- Streams with Scalaz traversal: return immediately; do wait for each future to complete.
- Scalaz futures with streams: return immediately; do wait for each future to complete.
And:
- Lists with standard library traversal: don't wait.
- Lists with Scalaz traversal: don't wait.
- Scalaz futures with lists: do wait for each future to complete.
Does this make any sense? Is there a "correct" behavior for this operation on lists and streams? Is there some reason that the "most asynchronous" behavior—i.e., don't consume the collection before returning, and don't wait for each future to complete before moving on to the next—isn't represented here?
traverse
returns aFuture[Stream[B]]
—that part's not up for debate. The question is what the semantics should be. – Travis Brownfor
:) – soulcheck