2
votes

I'm converting some C# code to scala and akka streams.

My c# code looks something like this:


Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...

async Task<Result> GetResultAsync(Request request) 
{
    var result1 = await GetPartialResult1Async(request);
    var result2 = await GetPartialResult2Async(request);
    return new Result(request, result1, result2);
}

Now for the akka streams. Instead of having a function from Request to a Task of a result, I have flows from a Request to a Result.

So I already have the following two flows:

val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...

However I can't see how to combine them into a complete flow, since by calling via on the first flow we lose the original request, and by calling via on the second flow we lose the result of the first flow.

So I've created a WithState monad which looks something like this:

case class WithState[+TState, +TValue](value: TValue, state: TState) {
  def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
    WithState(func(value), state)
  }
  ... bunch more helper functions go here
}

Then I'm rewriting my original flows to look like this:

def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...

and using them like this:

val flow = Flow[Request]
    .map(x => WithState(x, x))
    .via(partialResult1Flow)
    .map(x => WithState(x.state, (x.state, x.value))
    .via(partialResult2Flow)
    .map(x => Result(x.state._1, x.state._2, x.value))

Now this works, but of course I can't guarantee how flow will be used. So I really ought to make it take a State parameter:

def flow[TState] = Flow[WithState[TState, Request]]
    .map(x => WithState(x.value, (x.state, x.value)))
    .via(partialResult1Flow)
    .map(x => WithState(x.state._2, (x.state, x.value))
    .via(partialResult2Flow)
    .map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))

Now at this stage my code is getting extremely hard to read. I could clean it up by naming the functions, and using case classes instead of tuples etc. but fundamentally there's a lot of incidental complexity here, which is hard to avoid.

Am I missing something? Is this not a good use case for Akka streams? Is there some inbuilt way of doing this?

3
Why you can not use Future instead of Flow? It's more simple, and easy to combine. Future is standard class in Scala, than almost each Scala developer know how to use it.Mikhail Ionkin
My team has decided we want to use Akka streams for it's benefits in creating back pressure, and I'm evaluating it for our use case. It could well be that is not the right solution, in which case we won't use it.Yair Halberstadt

3 Answers

1
votes

I don't have any fundamentally different way to do this than I described in the question.

However the current flow can be significantly improved:

Stage 1: FlowWithContext

Instead of using a custom WithState monad, it's possible to use the built in FlowWithContext.

The advantage of this is that you can use the standard operators on the flow, without needing to worry about transforming the WithState monad. Akka takes care of this for you.

So instead of

def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = 
    Flow[WithState[TState, Request]].mapAsync(_ mapAsync {doRequest(_)})

We can write:

def partialResult1Flow[TState]: FlowWithContext[Request, TState, Result1, TState, NotUsed] = 
    FlowWithContext[Request, TState].mapAsync(doRequest(_))

Unfortunately though, whilst FlowWithContext is quite easy to write when you don't need to change the context, it's a little fiddly to use when you need to go via a stream which requires you to move some of your current data into the context (as ours does). In order to do that you need to convert to a Flow (using asFlow), and then back to a FlowWithContext using asFlowWithContext.

I found it easiest to just write the whole thing as a Flow in such cases, and convert to a FlowWithContext at the end.

For example:

def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] = 
  Flow[(Request, TState)]
    .map(x => (x._1, (x._1, x._2)))
    .via(partialResult1Flow)
    .map(x => (x._2._1, (x._2._1, x._1, x._2._2))
    .via(partialResult2Flow)
    .map(x => (Result(x._2._1, x._2._2, x._1), x._2._2))
    .asFlowWithContext((a: Request, b: TState) => (a,b))(_._2)
    .map(_._1)

Is this any better?

In this particular case it's probably worse. In other cases, where you rarely need to change the context it would be better. However either way I would recommend using it as it's built in, rather than relying on a custom monad.

Stage 2: viaUsing

In order to make this a bit more user friendly I created a viaUsing extension method for Flow and FlowWithContext:

import akka.stream.{FlowShape, Graph}
import akka.stream.scaladsl.{Flow, FlowWithContext}

object FlowExtensions {
  implicit class FlowViaUsingOps[In, Out, Mat](val f: Flow[In, Out, Mat]) extends AnyVal {
    def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, Out), (Out2, Out)], Mat2]) : Flow[In, (Out2, Out), Mat] =
      f.map(x => (func(x), x)).via(flow)
  }

  implicit class FlowWithContextViaUsingOps[In, CtxIn, Out, CtxOut, Mat](val f: FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends AnyVal {
    def viaUsing[Out2, Using, Mat2](func: Out => Using)(flow: Graph[FlowShape[(Using, (Out, CtxOut)), (Out2, (Out, CtxOut))], Mat2]):
    FlowWithContext[In, CtxIn, (Out2, Out), CtxOut, Mat] =
      f
        .asFlow
        .map(x => (func(x._1), (x._1, x._2)))
        .via(flow)
        .asFlowWithContext((a: In, b: CtxIn) => (a,b))(_._2._2)
        .map(x => (x._1, x._2._1))
  }
}

The purpose of viaUsing, is to create the input for a FlowWithContext from the current output, whilst preserving your current output by passing it through the context. It result in a Flow whose output is the a tuple of the output from the nested flow, and the original flow.

With viaUsing our example simplifies to:

  def flow[TState]: FlowWithContext[Request, TState, Result, TState, NotUsed] =
    FlowWithContext[Request, TState]
      .viaUsing(x => x)(partialResult1Flow)
      .viaUsing(x => x._2)(partialResult2Flow)
      .map(x => Result(x._2._2, x._2._1, x._1))

I think this is a significant improvement. I've made a request to add viaUsing to Akka instead of relying on extension methods here.

1
votes

I agree using Akka Streams for backpressure is useful. However, I'm not convinced that modelling the calculation of the partialResults as streams is useful here. having the 'inner' logic based on Futures and wrapping those in the mapAsync of your flow to apply backpressure to the whole operation as one unit seems simpler, and perhaps even better.

This is basically a boiled-down refactoring of Levi Ramsey's earlier excellent answer:

import scala.concurrent.{ ExecutionContext, Future }
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._

case class Request()
case class Result1()
case class Result2()
case class Response(r: Request, r1: Result1, r2: Result2)

def partialResult1(req: Request): Future[Result1] = ???
def partialResult2(req: Request): Future[Result2] = ???

val system = akka.actor.ActorSystem()
implicit val ec: ExecutionContext = system.dispatcher

val flow: Flow[Request, Response, NotUsed] =
  Flow[Request]
    .mapAsync(parallelism = 12) { req =>
      for {
        res1 <- partialResult1(req)
        res2 <- partialResult2(req)
      } yield (Response(req, res1, res2))
    }

I would start with this, and only if you know you have reason to split partialResult1 and partialResult2 into separate stages introduce an intermediate step in the Flow. Depending on your requirements mapAsyncUnordered might be more suitable.

0
votes

Disclaimer, I'm not totally familiar with C#'s async/await.

From what I've been able to glean from a quick perusal of the C# docs, Task<T> is a strictly (i.e. eager, not lazy) evaluated computation which will if successful eventually contain a T. The Scala equivalent of this is Future[T], where the equivalent of the C# code would be:

import scala.concurrent.{ ExecutionContext, Future }

def getPartialResult1Async(req: Request): Future[Result1] = ???
def getPartialResult2Async(req: Request): Future[Result2] = ???

def getResultAsync(req: Request)(implicit ectx: ExecutionContext): Future[Result] = {
  val result1 = getPartialResult1Async(req)
  val result2 = getPartialResult2Async(req)
  result1.zipWith(result2) { tup => val (r1, r2) = tup
    new Result(req, r1, r2)
  }
  /* Could also:
   *   for {
   *     r1 <- result1
   *     r2 <- result2
   *    } yield { new Result(req, r1, r2) }
   *
   * Note that both the `result1.zipWith(result2)` and the above `for`
   * construction may compute the two partial results simultaneously.  If you
   * want to ensure that the second partial result is computed after the first 
   * partial result is successfully computed:
   *   for {
   *     r1 <- getPartialResult1Async(req)
   *     r2 <- getPartialResult2Async(req)
   *   } yield new Result(req, r1, r2)
   */
}

No Akka Streams required for this particular case, but if you have some other need to use Akka Streams, You could express this as

val actorSystem = ??? // In Akka Streams 2.6, you'd probably have this as an implicit val
val parallelism = ??? // Controls requests in flight

val flow = Flow[Request]
  .mapAsync(parallelism) { req =>
    import actorSystem.dispatcher

    getPartialResult1Async(req).map { r1 => (req, r1) }
  }
  .mapAsync(parallelism) { tup =>
    import actorSystem.dispatcher

    getPartialResult2Async(tup._2).map { r2 =>
      new Result(tup._1, tup._2, r2)
    }
  }

  /* Given the `getResultAsync` function in the previous snippet, you could also:
   *   val flow = Flow[Request].mapAsync(parallelism) { req =>
   *     getResultAsync(req)(actorSystem.dispatcher)
   *   }
   */

One advantage of the Future-based implementation is that it's pretty easy to integrate with whatever Scala abstraction of concurrency/parallelism you want to use in a given context (e.g. cats, akka stream, akka). My general instinct to an Akka Streams integration would be in the direction of the three-liner in my comment in the second code block.