I am trying to test an endpoint that I write using Akka HTTP. This endpoint transform a source of something into a streaming HTTP response. I would like to be able to test timing of this response, for example, I would like to express something such as:
Open a request on /events/
If no events, I should have no line on the body of that opened http connection
Make a event happen in my application:
Now I should have something on that connection
Here is a minimal code example of what I have in mind:
- the directive under test:
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling.{Marshaller, Marshalling}
import akka.http.scaladsl.model.{ContentTypes}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.concurrent.duration._
object StupidStreamDirectives extends Directives {
implicit val entityStreamingSupport = EntityStreamingSupport.csv()
implicit val stringMarshaller = Marshaller.strict[String, ByteString] {
string => Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => ByteString(string))
def route = pathPrefix("stream") {
pathEndOrSingleSlash {
get {
parameter("take", "wait_in_ms") { (take, waitInMs) =>
complete(Source.repeat[String]("hello").take(take.toInt).throttle(1, waitInMs.toInt.millisecond))
- my test tentative:
import java.util.concurrent.TimeoutException
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{MediaRange, MediaTypes, StatusCodes}
import akka.http.scaladsl.testkit.{ ScalatestRouteTest}
import akka.stream.scaladsl.{Framing, Sink}
import akka.util.ByteString
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}
import scala.concurrent.duration.DurationInt
class StupidStreamDirectivesTest extends FlatSpec with
ScalatestRouteTest with
Matchers with
ScalaFutures {
test =>
implicit val entityStreamingSupport = EntityStreamingSupport.csv()
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))
"stupidStream" should "stream only one line if take=1" in {
Get("/stream?take=1&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual "hello\n"
it should "stream only three lines if take=3" in {
Get("/stream?take=3&wait_in_ms=1").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual "hello\n".repeat(3)
it should "not produce line too fast" in {
Get("/stream?take=4&wait_in_ms=100").addHeader(AcceptCsv) ~> StupidStreamDirectives.route ~> check {
status shouldEqual StatusCodes.OK
//.throttle(1, 100.millisecond) /* this line make the test work */
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.runWith(Sink.seq).futureValue(Timeout(1.second)) shouldEqual List.fill(3)("hello") :+ "end"
Last test will fail, what happen is that ~>check
will wait for the http request to be totally done before performing any check.
Therefore response.entity.dataBytes
do not reflect the timing of the http response and I guess is a streaming of the cached result (which is therefore super fast).
Also, one the last test, If I make the total time of the response exceed one second. My test will fail before reaching any test because check
will timeout. This mean it is impossible to test a long polling http response.
To what I saw reading Akka doc/code, is that this kind of testing is not really possible using Akka http testkit. If someone know a way to do that I would like to heard about it.
Otherwise, one could say I could do a start starting a real Akka http server... But I think this would be sad.