8
votes

I have a list of string ids representing DB records. I'd like to load them from the DB asynchronously, then upload each record to a remote server asynchronously, then when all are done uploading, make a record of the ids of the records that were uploaded.

Since I'm on Scala 2.9.2, I'm using Twitter's core-util Future implementation, but it should work exactly like the 2.10 futures in terms of Monadic transformations.

The general concept is this:

def fetch(id: String): Future[Option[Record]]
def upload(record: Record): Future[String]
def notifyUploaded(ids: Seq[String]): Unit

val ids: Seq[String] = ....

I'm trying to do this via a for comprehension but the fact that fetch returns a Future of Option makes it obscure and the code doesn't compile:

for {
  id <- ids
  maybeRecord <- fetch(id)
  record <- maybeRecord
  uploadedId <- upload(record)
} yield uploadedId

Compiling this results in the following error:

scala: type mismatch;
found   : com.twitter.util.Future[String]
required: Option[?]
    uploadedId <- upload(record)
                  ^

What am I missing? why does the compiler expect uploadedId to be an Option? is there any pretty way I could work around this?

2
A Monad is a Monoid in the category of Endo-functors. Just sayin'George
@folone: I fear that not everyone will get the joke. Just sayin'Régis Jean-Gilles
@folone But the issue is that the natural transformation is from the product of two endofunctors to itself, not that they themselves are endofunctors.Impredicative
@folone So, each monad is an object in the category of endofunctors on the category of Scala types. In that category, the morphisms are natural transformations between functors, and the thing being broken is that the natural transformation for 'flatMap/bind' is from M x M -> M, whereas composing it would be something like M x T -> TM (e.g. the composition of the functors.) The problem is that it's not guaranteed (and often not true) that the composition is itself a monad.Impredicative

2 Answers

6
votes

Consider the signature of the flatMap (or bind) function:

trait Monad[M[_]] {
  def flatMap[A](a : M[A], f : A => M[B]) : M[B]
  ....

In your case, you're trying to use flatMap on an Option, giving it an f that generates a Future. But as in the signature above, f should be generating something in the same monad that it's been called on.

Scala isn't necessarily terribly helpful in this regard, since it's pretty good at converting things around (to Seqs, for example) in such a way that you get the impression that you can chain arbitrary flatMap calls together, regardless of the container.

What you possibly want is a 'Monad transformer', which gives you some ability to compose monads. Debasish Ghosh has a post on using Scalaz monad transformers here.

0
votes

You cannot mix all different types in one for comprehension, I figured out that you might mix Seq and Option and result would be either Seq or Option depending on what is first. It is not possible to mix Future and Seq or Option. If you want to use for-comprehension you would have to cascade them few. In such cases it might be nicer with map/flatMap. I implemented your question in both ways and added types to few intermediate results so that you see the mess that is being created while working with all that different types.

object TestClass {

  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.concurrent._
  import scala.concurrent.duration._

  case class Record(id: String)


  def fetch(id: String): Future[Option[Record]] = Future {
    Thread.sleep(1000);
    Some(Record(id))
  }

  def upload(record: Record): Future[String] = Future {
    Thread.sleep(3000);
    record.id + "_uploaded"
  }

  def notifyUploaded(ids: Seq[String]): Unit = println("notified" + ids)

  val ids: Seq[String] = Seq("a", "b", "c")

  def main(args: Array[String]): Unit = {
    forComprehensionImpl()
    mapAndFlatMapImpl()
  }

  def forComprehensionImpl() = {
    val result: Seq[Future[Option[Future[String]]]] = for {
      id <- ids
    } yield {
      for {
        maybeRecord <- fetch(id)
      } yield {
        for {
          record <- maybeRecord
        } yield {
          for {
            uploadedId <- upload(record)
          } yield uploadedId
        }
      }
    }
    val result2: Future[Seq[Option[Future[String]]]] = Future.sequence(result)
    val result3: Future[Unit] = result2.flatMap { x: Seq[Option[Future[String]]] =>
      Future.sequence(x.flatten).map(notifyUploaded)
    }
    Await.result(result3, Duration.Inf)
  }


  def mapAndFlatMapImpl() = {
    val res: Seq[Future[Iterable[String]]] = ids.map { id =>
      fetch(id).flatMap { maybeRecord =>
        val res1: Option[Future[Seq[String]]] = maybeRecord.map { record =>
          upload(record) map (Seq(_))
        }
        res1 match {
          case Some(a) => a
          case None => Future(Seq())
        }
      }
    }
    val res3: Future[Unit] = Future.sequence(res) map (a => notifyUploaded(a.flatten))
    Await.result(res3, Duration.Inf)
  }
}