I need to consume a REST service using Akka's HTTP client (v2.0.2). The logical approach is to do this via a host connection pool because we expect large numbers of simultaneous connections. The Flow
for this consumes a (HttpRequest, T)
and returns a (Try[HttpResponse, T)
. The documentation indicates that some arbitrary type T
is needed to manage potential out of order responses to requests but does not point out what the caller is supposed to do with the returned T
.
My first attempt is the function below using an Int
as T
. It is called from many places to ensure that the connections use a single pool.
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
The question is how should the client use this T
? Is there a cleaner more efficient solution? And finally, Is my paranoia that something may arrive out of order not actually paranoia?