16
votes

I need to consume a REST service using Akka's HTTP client (v2.0.2). The logical approach is to do this via a host connection pool because we expect large numbers of simultaneous connections. The Flow for this consumes a (HttpRequest, T) and returns a (Try[HttpResponse, T). The documentation indicates that some arbitrary type T is needed to manage potential out of order responses to requests but does not point out what the caller is supposed to do with the returned T.

My first attempt is the function below using an Int as T. It is called from many places to ensure that the connections use a single pool.

val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))

def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
  val unique = Random.nextInt
  Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
    case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
    case (Failure(f), `unique`) ⇒ Future.failed(f)
    case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
  }
}

The question is how should the client use this T? Is there a cleaner more efficient solution? And finally, Is my paranoia that something may arrive out of order not actually paranoia?

3

3 Answers

25
votes

I was a little confused by this myself initially until I read through the docs a few times. If you are going to use single requests into the pool, no matter how many different places are sharing that same pool, the T that you are supplying (an Int in your case) doesn't matter. So if you are using Source.single all the time, that key can always be 1 if you really want.

Where it does come into play though, is if a piece of code is going to use the pool and submit multiple requests at once into the pool and wants the responses from all of those requests. The reason why is that the responses come back in the order they were received from the service that was called, and not the order in which they were supplied to the pool. Each request could take different amounts of time, so they flow downstream to the Sink in the order they were received back from the pool.

Say we had a service out there that accepted GET requests with a url in the form:

/product/123

Where the 123 part is the id of the product that you wanted to look up. If I wanted to look up products 1-10 all at once, with separate request for each, this is where the identifier becomes important so that I can correlate each HttpResponse with the product id that it is for. A simplified code example for this scenario would be as follows:

val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] = 
  Source(requests).
    via(pool).
    runFold(Map.empty[Int,HttpResponse]){
      case (m, (util.Success(resp), id)) => 
        m ++ Map(id -> resp)

      case (m, (util.Failure(ex), i)) =>
        //Log a failure here probably
          m
    }

When I get my responses in the fold, I also conveniently have the id that each is associated with so I can add them to my Map that is keyed by id. Without this functionality, I would probably have to do something like parse the body (if it was json) to try and figure out which response was which and that is not ideal, and that doesn't cover the fail case. In this solution, I know which requests failed because I still get the identifier back.

I hope that clarifies things a bit for you.

7
votes

Akka HTTP Connection pools are powerful allies when consuming HTTP based resources. If you are going to execute single requests at a time then a solution is:

def exec(req: HttpRequest): Future[HttpResponse] = {
  Source.single(req → 1)
    .via(pool)
    .runWith(Sink.head).flatMap {
      case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
      case (Failure(f), _) ⇒ Future.failed(f)
    }
}

Because you are executing a single request, there is no need to disambiguate the response. However, Akka streams are clever. You can submit multiple requests to the pool at the same time. In this instance we pass in an Iterable[HttpRequest]. The returned Iterable[HttpResponse] is reordered using a SortedMap to the same order as the original requests. You can just do a request zip response to line things up:

def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.map(r ⇒ r.values)
}

Futures of iterable futures are great if you need to unpack things your way. A simpler response can be obtained by just flattening things.

def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
  Source(requests.zipWithIndex.toMap)
    .via(pool)
    .runFold(SortedMap[Int, Future[HttpResponse]]()) {
      case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
      case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
    }.flatMap(r ⇒ Future.sequence(r.values))
}

I have made this gist with all the imports and wrappers to make a client for consuming HTTP services.

A special thanks to @cmbaxter for his neat example.

0
votes

There is an open ticket for improving akka-http documentation about this. Please check this example

val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
     case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run


val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})