I'm currently trying to read a paginated HTTP resource. Each page is a Multipart Document and the response for the page include a next
link in the headers if there is a page with more content. An automated parser can then start at the oldest page and then read page by page using the headers to construct the request for the next page.
I'm using Akka Streams and Akka Http for the implementation, because my goal is to create a streaming solution. I came up with this (I will include only the relevant parts of the code here, feel free to have a look at this gist for the whole code):
def read(request: HttpRequest): Source[HttpResponse, _] =
Source.unfoldAsync[Option[HttpRequest], HttpResponse](Some(request))(Crawl.crawl)
val parse: Flow[HttpResponse, General.BodyPart, _] = Flow[HttpResponse]
.flatMapConcat(r => Source.fromFuture(Unmarshal(r).to[Multipart.General]))
.flatMapConcat(_.parts)
....
def crawl(reqOption: Option[HttpRequest]): Future[Option[(Option[HttpRequest], HttpResponse)]] = reqOption match {
case Some(req) =>
Http().singleRequest(req).map { response =>
if (response.status.isFailure()) Some((None, response))
else nextRequest(response, HttpMethods.GET)
}
case None => Future.successful(None)
}
So the general idea is to use Source.unfoldAsync
to crawl through the pages and to do the HTTP requests (The idea and implementation are very close to what's described in this answer. This will create a Source[HttpResponse, _]
that can then be consumed (Unmarshal to Multipart, split up into the individual parts, ...).
My problem now is that the consumption of the HttpResponse
s might take a while (Unmarshalling takes some time if the pages are large, maybe there will be some database requests at the end to persist some data, ...). So I would like the Source.unfoldAsync
to backpressure if the downstream is slower. By default, the next HTTP request will be started as soon as the previous one finished.
So my question is: Is there some way to make Source.unfoldAsync
backpressure on a slow downstream? If not, is there an alternative that makes backpressuring possible?
I can imagine a solution that makes use of the Host-Level Client-Side API that akka-http provides, as described here together with a cyclic graph where the response of first request will be used as input to generate the second request, but I haven't tried that yet and I'm not sure if this could work or not.
EDIT: After some days of playing around and reading the docs and some blogs, I'm not sure if I was on the right track with my assumption that the backpressure behavior of Source.unfoldAsync
is the root cause. To add some more observations:
- When the stream is started, I see several requests going out. This is no problem in the first place, as long as the resulting
HttpResponse
is consumed in a timely fashion (see here for a description) - If I don't change the default
response-entity-subscription-timeout
, I will run into the following error (I stripped out the URLs):[WARN] [03/30/2019 13:44:58.984] [default-akka.actor.default-dispatcher-16] [default/Pool(shared->http://....)] [1 (WaitingForResponseEntitySubscription)] Response entity was not subscribed after 1 seconds. Make sure to read the response entity body or call discardBytes() on it. GET ... Empty -> 200 OK Chunked
This leads to anIllegalStateException
that terminates the stream:java.lang.IllegalStateException: Substream Source cannot be materialized more than once
- I observed that the unmarshalling of the response is the slowest part in the stream, which might make sense because the response body is a Multipart document and thereby relatively large. However, I would expect this part of the stream to signal less demand to the upstream (which is the
Source.unfoldAsync
part in my case). This should lead to the fact that less requests are made. - Some googling lead me to a discussion about an issue that seems to describe a similar problem. They also discuss the problems that occur when a response is not processed fast enough. The associated merge request will bring documentation changes that propose to completely consume the
HttpResponse
before continuing with the stream. In the discussion to the issue there are also doubts about whether or not it's a good idea to combine Akka Http with Akka Streams. So maybe I would have to change the implementation to directly do the unmarshalling inside the function that's being called byunfoldAsync
.