5
votes

I'm using akka-http to make a request to a http service which sends back chunked response. This is how the relevant bit of code looks like:

val httpRequest: HttpRequest = //build the request
val request = Http().singleRequest(httpRequest)
request.flatMap { response =>
    response.entity.dataBytes.runForeach { chunk =>
        println("-----")
        println(chunk.utf8String)
    }
}

and the output produced in the command line looks something like this:

-----
{"data":
-----
"some text"}

-----
{"data":
-----
"this is a longer
-----
text"}

-----
{"data": "txt"}

-----
...

The logical piece of data - a json in this case ends with an end of line symbol \r\n, but the problem is, that the json doesn't always fit in a single http response chunk as clearly visible in the example above.

My question is - how do I concatenate the incoming chunked data into full jsons so that the resulting container type would still remain either Source[Out,M1] or Flow[In,Out,M2]? I'd like to follow the idealogy of akka-stream.

UPDATE: It's worth mentioning also, that the response is endless and the aggregation must be done in real time

3

3 Answers

4
votes

Found a solution:

val request: HttpRequest = //build the request
request.flatMap { response =>
    response.entity.dataBytes.scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String)
        .filter(_.contains("\r\n"))
        .runForeach { json =>
            println("-----")
            println(json)
        }
}
0
votes

The akka stream documentation has an entry in the cookbook for this very problem: "Parsing lines from a stream of ByteString". Their solution is quite verbose but can also handle the situation where a single chunk can contain multiple lines. This seems more robust since the chunk size could change to be big enough to handle multiple json messages.

0
votes
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 8096))
.mapAsyncUnordered(Runtime.getRuntime.availableProcessors()) { data =>
  if (response.status == OK) {
    val event: Future[Event] = Unmarshal(data).to[Event]
    event.foreach(x => log.debug("Received event: {}.", x))
    event.map(Right(_))
  } else {
    Future.successful(data.utf8String)
      .map(Left(_))
  }
}

The only requirement is you know the maximum size of one record. If you start with something small, the default behavior is to fail if the record is larger than the limit. You can set it to truncate instead of failing, but piece of a JSON makes no sense.