3
votes

I have an iterable of futures, each of which returns a sequence: Iterable[Future[Seq[Int]]] As a result, I need a sequence which is the concatenation of the sequences returned from the futures: Seq[Int].

The thing is that I only need the first n elements of resulting sequence, so I don't always need to execute all the futures. I also don't know in advance how many futures need to be executed to achieve it (maybe the first one will return enough, maybe we have to execute all).

Obviously, I need to execute my features sequentially. I could do foreach and break/return, but I'd like to express it in functional style.

2
Note that if you already have a Iterable of Futures, they have already "started".Ende Neu
@EndeNeu ah, this is a very valuable comment! So, in fact, there is no way to "cancel" them anymore?roman-roman
but, wait a moment, if it is really an Iterable, and not a Seq, will they really be started? I will initialize them "on demand", shouldn't this help?roman-roman
It all depends on how you "fit" those futures into the iterable, if you have something like val f1: Future[_] it means the future is already working. My 2 cents would be to have a Seq of functions (depends on your logic and on what you're trying to do here) and then wrap them into a future while iterating the seq using something like the answer below.Ende Neu

2 Answers

2
votes

The following seems to work and 'looks' functional. I have limited knowledge of how this will actually perform or act under the hood.

This could easily be slapped into a def where the hard coded 4 is passed a parameter.

I figured that I should return 5 elements even though only 4 were requested since evaluation of the middle future needs to happen either way. Dropping the superfluous elements should be simple.

val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8)))

val output = f.foldLeft(Future(Seq.empty[Int])){(previous, next) =>
    previous.flatMap{pSeq =>
        if(pSeq.length >= 4) {
            Future(pSeq)
        } else {
            next.map(nSeq => pSeq ++ nSeq)
        }
    }
}

println(Await.result(output, Duration.Inf)) //List(1,2,3,4,5)

The bit I don't like is wrapping the pSeq in a Future just to maintain consistent types.

EDIT: Just a response to viktor's answer (I can't comment since not high enough rep and it adds value to my answer slightly).

Even though Viktor's answer is easier to read it must wait for all Futures to complete even if they aren't required.

For example replace f in mine with the following:

val f = Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(throw new Exception))

It will still work, Viktor's calls Future.sequence which turns a an Iterable[Future[]] int an Future[Iterable[]] and therefore all must be completed.

1
votes

Either you use Future.fold:

scala> import scala.concurrent._

scala> import ExecutionContext.Implicits.global

scala> Future.fold(Iterable(Future(Seq(1,2,3)), Future(Seq(4,5)), Future(Seq(6,7,8))))(Seq.empty[Int])( (prev, cur) => if(prev.size >= 4) prev else prev ++ cur) foreach println

List(1, 2, 3, 4, 5)

scala>

Or you look at how Future.fold is implemented and you add an exit condition. (essentially a foldUntil)