1
votes

I have a web-app that does a bunch of slow concurrent work to calculate its result. Instead of leaving the end user hanging I'd like to stream back progress updates via a websocket.

My codebase is built up of composition of Scalaz eithers (/) like:

type ProcessResult = Error \/ Int

def downloadFile(url: String): Future[Error \/ String] = ???
def doSlowProcessing(data1: String, data2: String): Future[ProcessResult] = ???

/* Very simple however doesn't give any progress update */
def execute(): Future[ProcessResult] = {
 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- download1
   d2 <- download2
   processed <- doSlowProcessing(d1, d2)
 } yield processed   

 et.run 
}

This works very well but of course the entire computation needs to be finished before I get anything out of the Future. Even if I stacked on a Writer monad to do logging I would only get the log once finished, not making my end users any happier.

I toyed around with using a scalaz-stream Queue to send the logs as a side effect while the code is running, however the end result is pretty ugly:

def execute(): Process[Task, String \/ ProcessResult] = {
 val (q, src) = async.queue[String \/ ProcessResult]

 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- q.enqueue("Downloading 1".left); download1
   d2 <- q.enqueue("Downloading 2".left); download2
   processed <- q.enqueue("Doing processing".left); doSlowProcessing(d1, d2)
 } yield processed    

 et.run.onSuccess {
  x =>
   q.enqueue(x.right)
   q.close
 }

 src
}

It feels like there should be an idiomatic way to achieve this? Turning my SIP-14 Scala futures into Tasks is possible if necessary.

1

1 Answers

1
votes

I don't think you need to use queue, one of the approaches can be to use non-Deterministic merging using the wye, i.e.

type Result = ???
val download1: Process[Task,File] = ???
val download2: Process[Task,File] = ???


val result: Process[Task,(File,File)] = (download1 yip download2).once 

val processed: Process[Task, Result] = result.flatMap(doSlowProcessing)

// Run asynchronously, 
processed.runLast.runAsync {
  case Some(r) => .... // result computed
  case None => .... //no result, hence download1,2 were empty.
}

//or run synchronously awaiting the result
processed.runLast.run match {
  case Some(r) => .... // result computed
  case None => .... //no result 
}

//to capture the error information while download use 
val withError: Process[Task,Throwable\/File] = download1.attempt

//or to log and recover to other file download
val withError: Process[Task,File] download1 onFailure { err => Log(err); download3 }

Does that make a sense?

Also please note that async.queue is deprecated since 0.5.0 in favor to async.unboundedQueue