5
votes

We started to implement the Source.queue[HttpRequest] pattern mentioned in the docs: http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples

This is the (reduced) example from the documentation

val poolClientFlow = Http()
  .cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")

val queue =
  Source.queue[(HttpRequest, Promise[HttpResponse])](
     QueueSize, OverflowStrategy.dropNew
  )
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p))    => p.failure(e)
    }))(Keep.left)
    .run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
  val responsePromise = Promise[HttpResponse]()
  queue.offer(request -> responsePromise).flatMap {
    case QueueOfferResult.Enqueued    => responsePromise.future
    case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
    case QueueOfferResult.Failure(ex) => Future.failed(ex)
    case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
  }
}

val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))

The docs state that using Source.single(request) is an anti-pattern and should be avoid. However it doesn't clarify why and what implications come by using Source.queue.

At this place we previously showed an example that used the Source.single(request).via(pool).runWith(Sink.head). In fact, this is an anti-pattern that doesn’t perform well. Please either supply requests using a queue or in a streamed fashion as shown below.

Advantages of Source.queue

  • The flow is only materialized once ( probably a performance gain? ). However if I understood the akka-http implementation correctly, a new flow is materialized for each connection, so this doesn't seem to be that much of a problem
  • Explicit backpressure handling with OverflowStrategy and matching over the QueueOfferResult

Issues with Source.queue

These are the questions that came up, when we started implementing this pattern in our application.

Source.queue is not thread-safe

The queue implementation is not thread safe. When we use the queue in different routes / actors we have this scenario that:

A enqueued request can override the latest enqueued request, thus leading to an unresolved Future.

UPDATE

This issue as been addressed in akka/akka/issues/23081. The queue is in fact thread safe.

Filtering?

What happens when request are being filtered? E.g. when someone changes the implementation

Source.queue[(HttpRequest, Promise[HttpResponse])](
    QueueSize, OverflowStrategy.dropNew)
  .via(poolClientFlow)
  // only successful responses
  .filter(_._1.isSuccess)
  // failed won't arrive here
  .to(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))

Will the Future not resolve? With a single request flow this is straightforward:

Source.single(request).via(poolClientFlow).runWith(Sink.headOption)

QueueSize vs max-open-request?

The difference between the QueueSize and max-open-requests is not clear. In the end, both are buffers. Our implementation ended up using QueueSize == max-open-requests

What's the downside for Source.single()?

Until now I have found two reasons for using Source.queue over Source.single

  1. Performance - materializing the flow only once. However according to this answer it shouldn't be an issue
  2. Explicitly configuring backpressure and handle failure cases. In my opinion the ConnectionPool has a sufficient handling for too much load. One can map over the resulting future and handle the exceptions.

thanks in advance, Muki

1
Muki, for each connection there's a materialization, not for each request.Heiko Seeberger
Thanks Heiko :) I changed the descriptionMuki

1 Answers

1
votes

I'll answer each of your questions directly and then give a general indirect answer to the overall problem.

probably a performance gain?

You are correct that there is a Flow materialized for each IncomingConnection but there is still a performance gain to be had if a Connection has multiple requests coming from it.

What happens when request are being filtered?

In general streams do not have a 1:1 mapping between Source elements and Sink Elements. There can be 1:0, as in your example, or there can be 1:many if a single request somehow spawned multiple responses.

QueueSize vs max-open-request?

This ratio would depend on the speed with which elements are being offered to the queue and the speed with which http requests are being processed into responses. There is no pre-defined ideal solution.

GENERAL REDESIGN

In most cases a Source.queue is used because some upstream function is creating input elements dynamically and then offering them to the queue, e.g.

val queue = ??? //as in the example in your question

queue.offer(httpRequest1)
queue.offer(httpRequest2)
queue.offer(httpRequest3)

This is poor design because whatever entity or function that is being used to create each input element could itself be part of the stream Source, e.g.

val allRequests = Iterable(httpRequest1, httpRequest2, httpRequest3)

//no queue necessary
val allResponses : Future[Seq[HttpResponse]] = 
  Source(allRequests)
    .via(poolClientFlow)
    .to(Sink.seq[HttpResponse])
    .run()

Now there is no need to worry about the queue, max queue size, etc. Everything is bundled into a nice compact stream.

Even if the source of requests is dynamic, you can still usually use a Source. Say we are getting the request paths from the console stdin, this can still be a complete stream:

import scala.io.{Source => ioSource}

val consoleLines : () => Iterator[String] = 
  () => ioSource.stdin.getLines()

Source
  .fromIterator(consoleLines)
  .map(consoleLine => HttpRequest(GET, uri = Uri(consoleLine)))
  .via(poolClientFlow)
  .to(Sink.foreach[HttpResponse](println))
  .run()

Now, even if each line is typed into the console at random intervals the stream can still behave reactively without a Queue.

The only instance I've every seen a queue, or Source.ActorRef, as being absolutely necessary is when you have to create a callback function that gets passed into a third party API. This callback function will have to offer the incoming elements to the queue.