11
votes

TLDR: is it better to materialize a stream per request (i.e. use short-lived streams) or to use a single stream materialization across requests, when I have an outgoing http request as a part of the stream?

Details: I have a typical service that takes an HTTP request, scatters it to several 3rd party downstream services (not controlled by me) and aggregates the results before sending them back. I'm using akka-http for client implementation and spray for server (legacy, will move to akka-http over time). Schematically:

request -> map -1-*-> map -> 3rd party http -> map -*-1> aggregation -> response

This can be achieved either by materializing a stream per request or materializing (parts of) stream once and share it across requests.

Materializing per request incurs materialization overhead1 and it is not clear how to leverage connection pools with it. The problem is described here (many materializations can exhaust the pool). I can wrap a pool in a long-running http stream like here and wrap in a mapAsync "upstream", but the error handling strategy is not clear to me. When a single request fails and the stream is terminated, would it take down the pool as well? More, it seems I will need to reconcile requests and responses since they are not returned in order.

// example of stream per request

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
    Flow[HttpRequest]
      .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
      .via(connectionFlow)
      .map { case (response, _) => response }

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .via(httpFlow)
    .mapAsync(1) {
       // response handling logic
    }
    .runWith(Sink.last)
})


// example of stream per request with long running http stream

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .mapAsync(1)(queueRequest)
    .mapAsync(1) {
       // somehow reconcile request with response?
       // response handling logic
    }
    .runWith(Sink.last)
})

Sharing stream across requests has a similar issue of error handling - it seems that there are failure modes that can bring down that stream with all requests in-flight. The code will be similar to host level API, but with the queue fronting the whole stream.

Which way is better in this case?

I did try to implement both solutions, but there are many design choices at every stage of implementation, so it seems easy to screw up even on a "right" path.

1Although I believe it is negligible, and it is the same way akka-http server operates.

1

1 Answers

1
votes

In general it is much better to use a single connection Flow and dispatch all of your requests through that single Flow. The primary reason is due to the fact that a new materialization may actually result in a new Connection being formed each time (depending on your connection pool settings).

You are correct that this results in a few complications:

Ordering: By providing a random UUID as the 2nd value in the tuple that you are passing to the connection flow you are eliminating your ability to correlate a request to a response. That extra T value in the tuple can be used as a "correlation id" to know which HttpResponse you are getting from the Flow. In your particular example you could use the initial Int from the Range you created:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
  Source
    .fromIterator( () => Iterator range (0,5) )
    .map(i => HttpRequest(...) -> i)
    .via(connectionFlow)

Now each response comes with the original Int value which you can use to process the response.

Error Handling: You are incorrect in stating "a single request fails and the stream is terminated". A single request failure DOES NOT necessarily result in the stream failing. Rather, you will simply get a (Failure(exception), Int) value from the connection flow. You now know which Int caused the failure and you have the exception from the flow.