Basically here is the code that I used.
When I make the connection with curl I see all the entities really fast in the curl command. When I try to emulate the same behavior with akka there are big pauses between printing out the elements that I got.
The stream bellow somehow gets back pressured and after first 4 messages the rest of 1 messages come after a noticeable time to the print line.
first 4 messages are around 2k JSON, the last one no. 5 is 80k JSON.
The last entity (number 5) is also the biggest chunk and I get the impression it's printed just before the stream completes. And I'm pretty positive it's available after only 2-3 seconds of running.
Any idea why this stream just hangs after reading first 4 elements
val awesomeHttpReq = Http().singleRequest(
HttpRequest(
method = GET,
uri = Uri("http://some-service-providing-endless-http.stream")
)
)
val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
case HttpResponse(status, _, entity, _) =>
// I saw some comments the back pressure might kick in
// because I might not be consuming the bytes here properly
// but this is totally in line with all the examples etc.
entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
// every line that comes in from previous stage contains
// key elements - this I'm interested in, it's an array
items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}
val b: Future[Vector[Json]] = a
.takeWithin(50 second)
.runWith(Sink.fold(Vector.empty[Json])((a, b) => {
// I'm using this to see what's going on in the stream
// there are significant pauses between the entities
// in reality the elements are available in the stream (all 5)
// within 2-3 seconds
// and this printing just has very big pause after first 4 elements
println(s"adding\n\n\n ${b.noSpaces}")
a :+ b
}))
Await.result(b, 1 minute)
I had a look at this issue it seems really close to what I have https://github.com/akka/akka-http/issues/57 but somehow fail to find something helpful for my case.
I also tried changing the chunk sizes for akka http, didn't really help.
here are the timings of incoming messages: from stream initialization:
1. 881 ms
2. 889 ms
3. 894 ms
4. 898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms
the last message obviously hangs somewhere for 30 seconds
Any ideas would really be appreciated.
Update:
Since it's really strange that the first 4 elements get out consistently at 4 and the 5th one is being waited on for 30 seconds, I decided to increase the initial-input-buffer-size = 4 from the default 4 to 16 and now it works as expected. I just simply fail to understand where the backpressure kicks in in the code above.
Update 2:
The buffer size helped with my simple example. But in my real problem I have something very strange going on:
entity.withoutSizeLimit.dataBytes
.alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
.buffer(1000, OverflowStrategy.backpressure)
.alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))
I can see the message I need before the framing (stage 1) but not after it in the logs (stage 2). And I made sure that there is enough space to push by putting buffer after it.
Now I've found out that the new line character doesn't really come into the stage infront (stage 1), this is how every line usually ends:
"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"
On my last item I'm missing the last byte a, basically the new line doesn't come into the framing. So the whole thing doesn't emit.
Source.fromFileinstead of the http request. - Frederic A.initial-input-buffer-size = 16and it works as expected... this is really strange, looks like backpressure is there somewhere. But can't figure out where. - Marko Švaljek