14
votes

I have a list of possible input Values

val inputValues = List(1,2,3,4,5)

I have a really long to compute function that gives me a result

def reallyLongFunction( input: Int ) : Option[String] = { ..... }

Using scala parallel collections, I can easily do

inputValues.par.map( reallyLongFunction( _ ) )

To get what all the results are, in parallel. The problem is, I don't really want all the results, I only want the FIRST result. As soon as one of my input is a success, I want my output, and want to move on with my life. This did a lot of extra work.

So how do I get the best of both worlds? I want to

  1. Get the first result that returns something from my long function
  2. Stop all my other threads from useless work.

Edit - I solved it like a dumb java programmer by having

@volatile var done = false;

Which is set and checked inside my reallyLongFunction. This works, but does not feel very scala. Would like a better way to do this....

3
Side note (not an answer to your question): this is IMHO simpler: inputValues.par.map(reallyLongFunction) - Tomasz Nurkiewicz
It doesn't look like to me parallel collections or the fork-join framework were designed to handle this case. If the computation is long because it's CPU intensive, it seems wasteful to want to compute all results or split the load between the cores versus putting all the cores working to computing a result. If the computation is long because it's waiting for some IO, it seems future or actors would be more appropriate. - huynhjl
Well for each input, its a purely single threaded computation that takes ~30 seconds of CPU time per input. About the perfect case to split up the work with fork join, IF there was a cleaner way to abort on first successful answer. - bwawok
@bwawok, I may be misunderstanding your use case. It feels it would amount to the same as this: given n cores, taking n inputs, kick off n computations and wait for the first one to finish. So the whole business about splitting tasks and stealing work from other queues does not come into play... - huynhjl

3 Answers

4
votes

(Updated: no, it doesn't work, doesn't do the map)

Would it work to do something like:

inputValues.par.find({ v => reallyLongFunction(v); true })

The implementation uses this:

  protected[this] class Find[U >: T](pred: T => Boolean, protected[this] val pit: IterableSplitter[T]) extends Accessor[Option[U], Find[U]] {
    @volatile var result: Option[U] = None
    def leaf(prev: Option[Option[U]]) = { if (!pit.isAborted) result = pit.find(pred); if (result != None) pit.abort }
    protected[this] def newSubtask(p: IterableSplitter[T]) = new Find(pred, p)
    override def merge(that: Find[U]) = if (this.result == None) result = that.result
  }

which looks pretty similar in spirit to your @volatile except you don't have to look at it ;-)

3
votes

I took interpreted your question in the same way as huynhjl, but if you just want to search and discardNones, you could do something like this to avoid the need to repeat the computation when a suitable outcome is found:

class Computation[A,B](value: A, function: A => B) {
  lazy val result = function(value)
}

def f(x: Int) = {          // your function here
  Thread.sleep(100 - x)
  if (x > 5) Some(x * 10)
  else None
}

val list = List.range(1, 20) map (i => new Computation(i, f))  
val found = list.par find (_.result.isDefined) 
  //found is Option[Computation[Int,Option[Int]]]
val result = found map (_.result.get)
  //result is Option[Int]

However find for parallel collections seems to do a lot of unnecessary work (see this question), so this might not work well, with current versions of Scala at least.

Volatile flags are used in the parallel collections (take a look at the source for find, exists, and forall), so I think your idea is a good one. It's actually better if you can include the flag in the function itself. It kills referential transparency on your function (i.e. for certain inputs your function now sometimes returns None rather than Some), but since you're discarding the stopped computations, this shouldn't matter.

2
votes

If you're willing to use a non-core library, I think Futures would be a good match for this task. For instance:

...both of which appear to enable the functionality you're looking for.