2
votes

Basically I'm running two futures queries on cassandra, then I need to do some computation and return the value(an average of values).

Here is my code:

object TestWrapFuture {
  def main(args: Array[String]) {
    val category = 5392
    ExtensiveComputation.average(category).onComplete {
      case Success(s) => println(s)
      case Failure(f) => throw new Exception(f)
    }
  }
}

class ExtensiveComputation {

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }

      val average = volume.sum / volume.length
      average
    }
  }
}

object ExtensiveComputation extends ExtensiveComputation

So what is the problem?

The skus.foreach are appending the result value in a ListBuffer. Since everything is async, when I try to obtain the result in my main, I got an error saying I can't divide by zero.

Indeed, since my Sku.findSkusByProduct returns a Future, when I try to compute the average, the volume is empty.

Should I block anything prior this computation, or should I do anything else?

EDIT

Well, I tried to block like this:

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    val blocked = productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }
    }

    Await.result(blocked, Duration.Inf)
    val average = volume.sum / volume.length
    Future.successful(average)
  }

Then I got two different results from this piece of code:

    Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }

1 - When there are just a few like 50 to be looked up on cassandra, it just runs and gives me the result

2 - When there are many like 1000, it gives me

java.lang.ArithmeticException: / by zero

EDIT 2

I tried this code as @Olivier Michallat proposed

   def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) findBlocking(prod._1)
      volume.sum / volume.length
    }
  }

  def findBlocking(productId: Long) = {
    val future = Sku.findSkusByProductId(productId).map { skus =>
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
    }

    Await.result(future, Duration.Inf)
  }

And the following as @kolmar proposed:

   def average(categoryId: Int): Future[Int] = {
    for {
      prods <- Product.findProductsByCategory(categoryId)
      filtered = prods.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
      volumes.sum / volumes.size
    }
  }

Both works with a few skus to find like 50, but both fails with many skus to find like 1000 throwing ArithmeticException: / by zero

It seems that it could not compute everything before returning the future...

2

2 Answers

3
votes

You need to wait until all the futures generated by findSkusByProductId have completed before you compute the average. So accumulate all these futures in a Seq, call Future.sequence on it to get a Future[Seq], then map that future to a function that computes the average. Then replace productsByCategory.map with a flatMap.

2
votes

Since you have to call a function that returns a Future on a sequence of arguments, it's better to use Future.traverse for that.

For example:

object ExtensiveComputation {
  def average(categoryId: Int): Future[Double] = {
    for {
      products <- Product.findProductsByCategory(categoryId)
      filtered = products.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.map { sku => 
        sku.height.get * sku.width.get * sku.length.get }
      volumes.sum / volumes.size
    }
  }
}