2
votes

I am trying to test an endpoint that I write using Akka HTTP. This endpoint transform a source of something into a streaming HTTP response. I would like to be able to test timing of this response, for example, I would like to express something such as:

Open a request on /events/
If no events, I should have no line on the body of that opened http connection
Make a event happen in my application:
Now I should have something on that connection

Here is a minimal code example of what I have in mind:

  • the directive under test:
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling.{Marshaller, Marshalling}
import akka.http.scaladsl.model.{ContentTypes}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.duration._

object StupidStreamDirectives extends Directives {
  implicit val entityStreamingSupport = EntityStreamingSupport.csv()
  implicit val stringMarshaller = Marshaller.strict[String, ByteString] {
    string => Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => ByteString(string))
  }

  def route = pathPrefix("stream") {
    pathEndOrSingleSlash {
      get {
        parameter("take", "wait_in_ms") { (take, waitInMs) =>
          complete(Source.repeat[String]("hello").take(take.toInt).throttle(1, waitInMs.toInt.millisecond))
        }
      }
    }
  }
}
  • my test tentative:
import java.util.concurrent.TimeoutException

import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{MediaRange, MediaTypes, StatusCodes}
import akka.http.scaladsl.testkit.{ ScalatestRouteTest}
import akka.stream.scaladsl.{Framing, Sink}
import akka.util.ByteString
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration.DurationInt

class StupidStreamDirectivesTest extends FlatSpec with
  ScalatestRouteTest with
  Matchers with
  ScalaFutures {
  test =>

  implicit val entityStreamingSupport = EntityStreamingSupport.csv()
  val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))

  "stupidStream" should "stream only one line if take=1" in {
    Get("/stream?take=1&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
      status shouldEqual StatusCodes.OK
      responseAs[String] shouldEqual "hello\n"
    }
  }

  it should "stream only three lines if take=3" in {
    Get("/stream?take=3&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
      status shouldEqual StatusCodes.OK
      responseAs[String] shouldEqual "hello\n".repeat(3)
    }
  }

  it should "not produce line too fast" in {
    Get("/stream?take=4&wait_in_ms=100").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
      status shouldEqual StatusCodes.OK
      response
        .entity
        .dataBytes
        //.throttle(1, 100.millisecond) /* this line make the test work */
        .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
        .map(_.utf8String)
        .takeWithin(250.millisecond)
        .runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.fill(3)("hello") :+ "end"
    }
  }
}

Last test will fail, what happen is that ~>check will wait for the http request to be totally done before performing any check. Therefore response.entity.dataBytes do not reflect the timing of the http response and I guess is a streaming of the cached result (which is therefore super fast).

Also, one the last test, If I make the total time of the response exceed one second. My test will fail before reaching any test because check will timeout. This mean it is impossible to test a long polling http response.

To what I saw reading Akka doc/code, is that this kind of testing is not really possible using Akka http testkit. If someone know a way to do that I would like to heard about it.

Otherwise, one could say I could do a start starting a real Akka http server... But I think this would be sad.

1
Could you clarify more around what you're trying to test in your real use case? Is it streaming some CSV file as you read it, or are you trying to long poll and just return one value as soon as it becomes available, or something else?kag0
The content of the stream does not matter at all, it is just for example. The idea is too be able to test: * long polling http response content before the full request complete * timing between chunk of responseLuc DUZAN
In the case of long polling, you should test the timing before the response line and headers come back, rather than the time between the headers ending and the content arriving. In general the behavior of streaming bodies is not reliable between client and server in HTTP. Chunks can be combined or split by intermediaries, chunked bodies can be converted to whole bodies, etc. Look up "hop-by-hop" headers. Websockets or HTTP/2 may be a better fit if you need framing for chunks inside your response.kag0

1 Answers

0
votes

You are probably looking for the runRoute helper

Its documentation says :

A dummy that can be used as ~> runRoute to run the route but without blocking for the result. The result of the pipeline is the result that can later be checked with check. See the "separate running route from checking" example from ScalatestRouteTestSpec.scala.

The separate running route from checking example is:

"separation of route execution from checking" in {
  val pinkHeader = RawHeader("Fancy", "pink")

  case object Command
  val service = TestProbe()
  val handler = TestProbe()
  implicit def serviceRef = service.ref
  implicit val askTimeout: Timeout = 1.second

  val result =
    Get() ~> pinkHeader ~> {
      respondWithHeader(pinkHeader) {
        complete(handler.ref.ask(Command).mapTo[String])
      }
    } ~> runRoute

  handler.expectMsg(Command)
  handler.reply("abc")

  check {
    status shouldEqual OK
    responseEntity shouldEqual HttpEntity(ContentTypes.`text/plain(UTF-8)`, "abc")
    header("Fancy") shouldEqual Some(pinkHeader)
  }(result)
}

your sample could then be written as something akin to :

    it should "not produce line too fast" in {
  Get("/stream?take=4&wait_in_ms=100").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> runRoute
  check {
     status shouldEqual StatusCodes.OK
     response
       .entity
       .dataBytes

       .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
       .map(_.utf8String)
       .takeWithin(250.millisecond)
       .runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.empty)
   }
 check {
    status shouldEqual StatusCodes.OK
    response
      .entity
      .dataBytes
      .throttle(1, 100.millisecond) /* this line make the test work */
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
      .map(_.utf8String)
      .takeWithin(250.millisecond)
      .runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.fill(3)("hello") :+ "end"
  }
}

I did not compile or run the example and I am not entirely sure you can actuall have two checks to the exact form of your test will have to be adapted. If I can find some time to run and fix the sample I'll update my answer.