41
votes

Introduction

Scala's Future (new in 2.10 and now 2.9.3) is an applicative functor, which means that if we have a traversable type F, we can take an F[A] and a function A => Future[B] and turn them into a Future[F[B]].

This operation is available in the standard library as Future.traverse. Scalaz 7 also provides a more general traverse that we can use here if we import the applicative functor instance for Future from the scalaz-contrib library.

These two traverse methods behave differently in the case of streams. The standard library traversal consumes the stream before returning, while Scalaz's returns the future immediately:

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))

There's also another difference, as Leif Warner observes here. The standard library's traverse starts all of the asynchronous operations immediately, while Scalaz's starts the first, waits for it to complete, starts the second, waits for it, and so on.

Different behavior for streams

It's pretty easy to show this second difference by writing a function that will sleep for a few seconds for the first value in the stream:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

Now Future.traverse(Stream(1, 2))(toFuture) will print the following:

Starting 1!
Starting 2!
Done 2!
Done 1!

And the Scalaz version (Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!

Which probably isn't what we want here.

And for lists?

Strangely enough the two traversals behave the same in this respect on lists—Scalaz's doesn't wait for one future to complete before starting the next.

Another future

Scalaz also includes its own concurrent package with its own implementation of futures. We can use the same kind of setup as above:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}

And then we get the behavior of Scalaz on streams for lists as well as streams:

Starting 1!
Done 1!
Starting 2!
Done 2!

Perhaps less surprisingly, traversing an infinite stream still returns immediately.

Question

At this point we really need a table to summarize, but a list will have to do:

  • Streams with standard library traversal: consume before returning; don't wait for each future.
  • Streams with Scalaz traversal: return immediately; do wait for each future to complete.
  • Scalaz futures with streams: return immediately; do wait for each future to complete.

And:

  • Lists with standard library traversal: don't wait.
  • Lists with Scalaz traversal: don't wait.
  • Scalaz futures with lists: do wait for each future to complete.

Does this make any sense? Is there a "correct" behavior for this operation on lists and streams? Is there some reason that the "most asynchronous" behavior—i.e., don't consume the collection before returning, and don't wait for each future to complete before moving on to the next—isn't represented here?

2
In "optimal" situation, Future.traverse on streams would have to return a stream (meaning lazy reading elements from input when they are requested on output) of Futures, created when requested. While certainly possible, it is more difficult to implement.soulcheck
@soulcheck: In this context traverse returns a Future[Stream[B]]—that part's not up for debate. The question is what the semantics should be.Travis Brown
you're right, wasn't reading it correctly. I see where your doubts are coming from now too.soulcheck
did you look into scala's traverse source? Damn, that's some ugly scala code. Especially that for :)soulcheck
Related question and link to ML at stackoverflow.com/a/17183164/1296806som-snytt

2 Answers

1
votes

I cannot answer it all, but i try on some parts:

Is there some reason that the "most asynchronous" behavior—i.e., don't consume the collection before returning, and don't wait for each future to complete before moving on to the next—isn't represented here?

If you have dependent calculations and a limited number of threads, you can experience deadlocks. For example you have two futures depending on a third one (all three in the list of futures) and only two threads, you can experience a situation where the first two futures block all two threads and the third one never gets executed. (Of course, if your pool size is one, i.e. zou execute one calculation after the other, you can get similar situations)

To solve this, you need one thread per future, without any limitation. This works for small lists of futures, but not for big one. So if you run all in parallel, you will get a situation where small examples will run in all cases and bigger one will deadlock. (Example: Developer tests run fine, production deadlocks).

Is there a "correct" behavior for this operation on lists and streams?

I think it is impossible with futures. If you know something more of the dependencies, or when you know for sure that the calculations will not block, a more concurrent solution might be possible. But executing lists of futures looks for me "broken by design". Best solution seems one, that will already fail for small examples for deadlocks (i.e. execute one Future after the other).

Scalaz futures with lists: do wait for each future to complete.

I think scalaz uses for comprehensions internally for traversal. With for comprehensions, it is not guaranteed that the calculations are independent. So I guess that Scalaz is doing the right thing here with for comprehensions: Doing one calculation after the other. In the case of futures, this will always work, given you have unlimited threads in you operating system.

So in other words: You see just an artifact of how for comprehensions (must) work.

I hope this makes some sense.

1
votes

If I understand the question correctly, I think it really comes down to the semantics of streams vs lists.

Traversing a list does what we'd expect from the docs:

Transforms a TraversableOnce[A] into a Future[TraversableOnce[B]] using the provided function A => Future[B]. This is useful for performing a parallel map. For example, to apply a function to all items of a list in parallel:

With streams, it's up to the developer to decide how they want it to work because it depends on more knowledge of the stream than the compiler has (streams can be infinite, but the type system doesn't know about it). if my stream is reading lines from a file, I want to consume it first, since chaining futures line by line wouldn't actually parallelize things. in this case, I would want the parallel approach.

On the other hand, if my stream is an infinite list generating sequential integers and hunting for the first prime greater than some large number, it would be impossible to consume the stream first in one sweep (the chained Future approach would be required, and we'd probably want to run over batches from the stream).

Rather than trying to figure out a canonical way to handle this, I wonder if there are missing types that would help make the different cases more explicit.