4
votes

I'm currently trying to read a paginated HTTP resource. Each page is a Multipart Document and the response for the page include a next link in the headers if there is a page with more content. An automated parser can then start at the oldest page and then read page by page using the headers to construct the request for the next page.

I'm using Akka Streams and Akka Http for the implementation, because my goal is to create a streaming solution. I came up with this (I will include only the relevant parts of the code here, feel free to have a look at this gist for the whole code):

def read(request: HttpRequest): Source[HttpResponse, _] =
  Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)

val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
  .flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
  .flatMapConcat(_.parts)

....

def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
  case Some(req) =>
    Http().singleRequest(req).map { response =>
      if (response.status.isFailure()) Some((None, response))
      else nextRequest(response, HttpMethods.GET)
    }
  case None => Future.successful(None)
}

So the general idea is to use Source.unfoldAsync to crawl through the pages and to do the HTTP requests (The idea and implementation are very close to what's described in this answer. This will create a Source[HttpResponse, _] that can then be consumed (Unmarshal to Multipart, split up into the individual parts, ...).

My problem now is that the consumption of the HttpResponses might take a while (Unmarshalling takes some time if the pages are large, maybe there will be some database requests at the end to persist some data, ...). So I would like the Source.unfoldAsync to backpressure if the downstream is slower. By default, the next HTTP request will be started as soon as the previous one finished.

So my question is: Is there some way to make Source.unfoldAsync backpressure on a slow downstream? If not, is there an alternative that makes backpressuring possible?

I can imagine a solution that makes use of the Host-Level Client-Side API that akka-http provides, as described here together with a cyclic graph where the response of first request will be used as input to generate the second request, but I haven't tried that yet and I'm not sure if this could work or not.


EDIT: After some days of playing around and reading the docs and some blogs, I'm not sure if I was on the right track with my assumption that the backpressure behavior of Source.unfoldAsync is the root cause. To add some more observations:

  • When the stream is started, I see several requests going out. This is no problem in the first place, as long as the resulting HttpResponse is consumed in a timely fashion (see here for a description)
  • If I don't change the default response-entity-subscription-timeout, I will run into the following error (I stripped out the URLs):
    [WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
    This leads to an IllegalStateException that terminates the stream: java.lang.IllegalStateException: Substream Source cannot be materialized more than once
  • I observed that the unmarshalling of the response is the slowest part in the stream, which might make sense because the response body is a Multipart document and thereby relatively large. However, I would expect this part of the stream to signal less demand to the upstream (which is the Source.unfoldAsync part in my case). This should lead to the fact that less requests are made.
  • Some googling lead me to a discussion about an issue that seems to describe a similar problem. They also discuss the problems that occur when a response is not processed fast enough. The associated merge request will bring documentation changes that propose to completely consume the HttpResponse before continuing with the stream. In the discussion to the issue there are also doubts about whether or not it's a good idea to combine Akka Http with Akka Streams. So maybe I would have to change the implementation to directly do the unmarshalling inside the function that's being called by unfoldAsync.
2

2 Answers

1
votes

According to the implementation of the Source.unfoldAsync the passed in function is only called when the source is pulled:

def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)

So if the downstream is not pulling (backpressuring) the function passed in to the source is not called.

In your gist you use runForeach (which is the same as runWith(Sink.foreach)) that pulls the upstream as soon as the println is finished. So it is hard to notice backpressure here.

Try changing your example to runWith(Sink.queue) which will give you an SinkQueueWithCancel as the materialized value. Then, unless you call pull on the queue, the stream will be backpressured and will not issue requests.

Note that there could be one or more initial requests until the backpressure propagates through all of the stream.

0
votes

I think I figured it out. As I already mentioned in the edit of my question, I found this comment to an issue in Akka HTTP, where the author says:

...it is simply not best practice to mix Akka http into a larger processing stream. Instead, you need a boundary around the Akka http parts of the stream that ensures they always consume their response before allowing the outer processing stream to proceed.

So I went ahead and tried it: Instead of doing the HTTP request and the unmarshalling in different stages of the stream, I directly unmarshal the response by flatMaping the Future[HttpResponse] into a Future[Multipart.General]. This makes sure that the HttpResponse is directly consumed and avoids the Response entity was not subscribed after 1 second errors. The crawl function looks slightly different now, because it has to return the unmarshalled Multipart.General object (for further processing) as well as the original HttpResponse (to be able to construct the next request out of the headers):

def crawl(reqOption: Option[HttpRequest])(implicit actorSystem: ActorSystem, materializer: Materializer, executionContext: ExecutionContext): Future[Option[(Option[HttpRequest], (HttpResponse, Multipart.General))]] = {
  reqOption match {
    case Some(request) =>
      Http().singleRequest(request)
        .flatMap(response => Unmarshal(response).to[Multipart.General].map(multipart => (response, multipart)))
        .map {
          case tuple@(response, multipart) =>
            if (response.status.isFailure()) Some((None, tuple))
            else nextRequest(response, HttpMethods.GET).map { case (req, res) => (req, (res, multipart)) }
        }
    case None => Future.successful(None)
  }
}

The rest of the code has to change because of that. I created another gist that contains equivalent code like the gist from the original question.

I was expecting the two Akka projects to integrate better (the docs don't mention this limitation at the moment, and instead the HTTP API seems to encourage the user to use Akka HTTP and Akka Streams together), so this feels a bit like a workaround, but it solves my problem for now. I still have to figure out some other problems I encounter when integrating this part into my larger use case, but this is not part of this question here.