We started to implement the Source.queue[HttpRequest]
pattern mentioned in the docs: http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples
This is the (reduced) example from the documentation
val poolClientFlow = Http()
.cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")
val queue =
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew
)
.via(poolClientFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => Future.failed(ex)
case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
}
}
val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))
The docs state that using Source.single(request)
is an anti-pattern and should be avoid. However it doesn't clarify why and what implications come by using Source.queue
.
At this place we previously showed an example that used the
Source.single(request).via(pool).runWith(Sink.head)
. In fact, this is an anti-pattern that doesn’t perform well. Please either supply requests using a queue or in a streamed fashion as shown below.
Advantages of Source.queue
- The flow is only materialized once ( probably a performance gain? ). However if I understood the akka-http implementation correctly, a new flow is materialized for each connection, so this doesn't seem to be that much of a problem
- Explicit backpressure handling with
OverflowStrategy
and matching over theQueueOfferResult
Issues with Source.queue
These are the questions that came up, when we started implementing this pattern in our application.
Source.queue is not thread-safe
The queue implementation is not thread safe. When we use the queue in different routes / actors we have this scenario that:
A enqueued request can override the latest enqueued request, thus leading to an unresolved Future.
UPDATE
This issue as been addressed in akka/akka/issues/23081. The queue is in fact thread safe.
Filtering?
What happens when request are being filtered? E.g. when someone changes the implementation
Source.queue[(HttpRequest, Promise[HttpResponse])](
QueueSize, OverflowStrategy.dropNew)
.via(poolClientFlow)
// only successful responses
.filter(_._1.isSuccess)
// failed won't arrive here
.to(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))
Will the Future not resolve? With a single request flow this is straightforward:
Source.single(request).via(poolClientFlow).runWith(Sink.headOption)
QueueSize vs max-open-request?
The difference between the QueueSize
and max-open-requests
is not clear. In the end, both are buffers. Our implementation ended up using QueueSize == max-open-requests
What's the downside for Source.single()?
Until now I have found two reasons for using Source.queue
over Source.single
- Performance - materializing the flow only once. However according to this answer it shouldn't be an issue
- Explicitly configuring backpressure and handle failure cases. In my opinion the ConnectionPool has a sufficient handling for too much load. One can map over the resulting future and handle the exceptions.
thanks in advance, Muki