1
votes

I have an akka-http route which is returning a Source containing an infinite stream of entities. How can I test that using the route testkit? I'd like to inspect just the first n elements of the stream, but I've taken a look at the testkit code, and it looks like there's no direct way to access the Source in the response. It's always converted to a sequence of ByteStrings, which in my case just causes a TimeoutException, since the stream never terminates.

For reference, the problem can be reproduced with a route looking something like this:

case class Bar(wibble: String, wobble: String)

path("stream") {
  get {
    complete {
      import JsonSupport._
      implicit val streamingSupport = EntityStreamingSupport.json()
      Source.unfold(1) { i =>
        Thread.sleep(10)
        Some((i + 1, Bar(i.toString, (i + 1).toString)))
      }
    }
  }
}
1
What is JsonSupport ??sarveshseri
JsonSupport is defined as: object JsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val barFormat = jsonFormat2(Bar) }Andrew Brett
But this streaming is not mean for something like an infinite stream. HTTP in its standard from must define a time-out for long-running requests. And thus Akka-http does it. If you need an "infinite-stream" then you should use web-sockets.sarveshseri

1 Answers

1
votes

It looks like whenever you're accessing the response and the entity within the boundaries of the testkit, the testkit it's going to try and extract it fully.

You can check out private method awaitAllElements in RouteTestResultComponent class for further insight.

You could try simply using akka-streams combinators instead. I reckon it wouldn't bloat your test code too much. Example below:

def firstNElements(n: Int) = Source.single(yourRequest)
  .via(RouteResult.route2HandlerFlow(route))
  .flatMapConcat(_.entity.dataBytes)
  .take(n)
  .runWith(Sink.seq)
  .futureValue

// assertions