14
votes

I sometimes find myself in a situation where I have some Stream[X], and a function X => Future Y, that I'd like to combine to a Future[Stream[Y]], and I can't seem to find a way to do it. For example, I have

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

I tried

 val result = Future.Traverse(x, toFutureString)

which gives the correct result, but seems to consume the entire stream before returning the Future, which more or less defeats the purpse

I tried

val result = x.flatMap(toFutureString)

but that doesn't compile with type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

returns the somewhat odd and useless Stream[Future[String]]

What should I do here to get things fixed?

Edit: I'm not stuck on a Stream, I'd be equally happy with the same operation on an Iterator, as long as it won't block on evaluating all items before starting to process the head

Edit2: I'm not 100% sure that the Future.Traverse construct needs to traverse the entire stream before returning a Future[Stream], but I think it does. If it doesn't, that's a fine answer in itself.

Edit3: I also don't need the result to be in order, I'm fine with the stream or iterator returned being whatever order.

3
Note that I've filed an issue to follow up on my answer below.Travis Brown
ah, great @TravisBrown. I wanted to do that myself, but I couldn't find a way to log in to JiraMartijn
A little unclear - you want to avoid applying "toFutureString" to all elements in the collection before...? Seems there shouldn't be much overhead to simply creating a Future. If the remaining items in the "list" are thunks, what would trigger their evaluation? Completion of the Future previous in the list? All of the sequence / traverse operations I could find in Scala seemed to be strict on the individual list elements.pdxleif
@pdxleif good question, and I suppose that basically is what my questions boils down to. I know I just want a [Future[Stream[String]], but I don't know how and when each evaluation should take place (if I did, I wouldn't be asking ;). In my mind a thunk and a Future are (possibly erraniously) similar things, so I would like to join or compose them I guess?Martijn
I think they are similar. The Scalaz traverse Travis mentions does immediately give you a Future[Stream[String]], where the (single) Future does not complete until the entire Stream has had toFutureStream applied and run to completion. I guess I'm a little curious on the use-case - typically when I have a series of IO computations that involve waiting, with no data dependencies between them, I'd want to kick off their execution eagerly, in parallel. The scalaz traverse instance for Stream seem to give a give a similar effect to what you'd achieve from folding a regular List w/ .flatMap.pdxleif

3 Answers

9
votes

You're on the right track with traverse, but unfortunately it looks like the standard library's definition is a little broken in this case—it shouldn't need to consume the stream before returning.

Future.traverse is a specific version of a much more general function that works on any applicative functor wrapped in a "traversable" type (see these papers or my answer here for more information, for example).

The Scalaz library provides this more general version, and it works as expected in this case (note that I'm getting the applicative functor instance for Future from scalaz-contrib; it's not yet in the stable versions of Scalaz, which are still cross-built against Scala 2.9.2, which doesn't have this Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

This returns immediately on an infinite stream, so we know for sure that it's not being consuming first.


As a footnote: If you look at the source for Future.traverse you'll see that it's implemented in terms of foldLeft, which is convenient, but not necessary or appropriate in the case of streams.

2
votes

Forgetting about Stream:

import scala.concurrent.Future
import ExecutionContext.Implicits.global

val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
  println("starting " + value)
  Thread.sleep(1000)
  println("completed " + value)
  value.toString
}

yields (on my 8 core box):

scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2

scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10

So 8 of them get kicked off immediately (one for each core, though that's configurable via the threadpool executor), and then as those complete more are kicked off. The Future[List[String]] returns immediately, and then after a pause it starts printing those "completed x" messages.

An example use of this could be when you have a List[Url's], and a function of type Url => Future[HttpResponseBody]. You could call Future.traverse on that list with that function, and kick off those http requests in parallel, getting back a single future that's a List of the results.

Was something that like what you were going for?

0
votes

The accepted answer is no longer valid as the modern version of Scalaz traverse() behaves differently and tries to consume the entire stream on the invocation time.

As to the question I would say that it's impossible to achieve this a truly non-blocking fashion.

Future[Stream[Y]] cannot be resolved until Stream[Y] is available. And since Y is produced asynchronously by the function X => Future[Y] you cannot get Y without blocking on the time when you traverse Stream[Y]. That means that either all the Future[Y] must be resolved before resolving Future[Stream[Y]] (which requires consuming the entire stream), or you must allow blocks to occur while traversing Stream[Y] (on items whose underlying futures aren't completed yet). But if we allow for blocking on the traversing then what would be the definition of the completion of the resulting future? From that perspective it could be the same as Future.successful(BlockingStream[Y]). That's in turn semantically equal to the original Stream[Future[Y]].

In other words, I think there is an issue in the question itself.