I'm converting some C# code to scala and akka streams.
My c# code looks something like this:
Task<Result1> GetPartialResult1Async(Request request) ...
Task<Result2> GetPartialResult2Async(Request request) ...
async Task<Result> GetResultAsync(Request request)
{
var result1 = await GetPartialResult1Async(request);
var result2 = await GetPartialResult2Async(request);
return new Result(request, result1, result2);
}
Now for the akka streams. Instead of having a function from Request
to a Task
of a result, I have flows from a Request to a Result.
So I already have the following two flows:
val partialResult1Flow: Flow[Request, Result1, NotUsed] = ...
val partialResult2Flow: Flow[Request, Result2, NotUsed] = ...
However I can't see how to combine them into a complete flow, since by calling via on the first flow we lose the original request, and by calling via on the second flow we lose the result of the first flow.
So I've created a WithState monad which looks something like this:
case class WithState[+TState, +TValue](value: TValue, state: TState) {
def map[TResult](func: TValue => TResult): WithState[TState, TResult] = {
WithState(func(value), state)
}
... bunch more helper functions go here
}
Then I'm rewriting my original flows to look like this:
def partialResult1Flow[TState]: Flow[WithState[TState, Request], WithState[TState, Result1]] = ...
def partialResult2Flow: Flow[WithState[TState, Request], WithState[TState, Result2]] = ...
and using them like this:
val flow = Flow[Request]
.map(x => WithState(x, x))
.via(partialResult1Flow)
.map(x => WithState(x.state, (x.state, x.value))
.via(partialResult2Flow)
.map(x => Result(x.state._1, x.state._2, x.value))
Now this works, but of course I can't guarantee how flow will be used. So I really ought to make it take a State parameter:
def flow[TState] = Flow[WithState[TState, Request]]
.map(x => WithState(x.value, (x.state, x.value)))
.via(partialResult1Flow)
.map(x => WithState(x.state._2, (x.state, x.value))
.via(partialResult2Flow)
.map(x => WithState(Result(x.state._1._2, x.state._2, x.value), x.state._1._1))
Now at this stage my code is getting extremely hard to read. I could clean it up by naming the functions, and using case classes instead of tuples etc. but fundamentally there's a lot of incidental complexity here, which is hard to avoid.
Am I missing something? Is this not a good use case for Akka streams? Is there some inbuilt way of doing this?