TLDR: is it better to materialize a stream per request (i.e. use short-lived streams) or to use a single stream materialization across requests, when I have an outgoing http request as a part of the stream?
Details: I have a typical service that takes an HTTP request, scatters it to several 3rd party downstream services (not controlled by me) and aggregates the results before sending them back. I'm using akka-http for client implementation and spray for server (legacy, will move to akka-http over time). Schematically:
request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response
This can be achieved either by materializing a stream per request or materializing (parts of) stream once and share it across requests.
Materializing per request incurs materialization overhead1 and it is not clear how to leverage connection pools with it. The problem is described here (many materializations can exhaust the pool). I can wrap a pool in a long-running http stream like here and wrap in a mapAsync
"upstream", but the error handling strategy is not clear to me. When a single request fails and the stream is terminated, would it take down the pool as well? More, it seems I will need to reconcile requests and responses since they are not returned in order.
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
Sharing stream across requests has a similar issue of error handling - it seems that there are failure modes that can bring down that stream with all requests in-flight. The code will be similar to host level API, but with the queue fronting the whole stream.
Which way is better in this case?
I did try to implement both solutions, but there are many design choices at every stage of implementation, so it seems easy to screw up even on a "right" path.
1Although I believe it is negligible, and it is the same way akka-http server operates.