1
votes

Basically here is the code that I used.

When I make the connection with curl I see all the entities really fast in the curl command. When I try to emulate the same behavior with akka there are big pauses between printing out the elements that I got.

The stream bellow somehow gets back pressured and after first 4 messages the rest of 1 messages come after a noticeable time to the print line.

first 4 messages are around 2k JSON, the last one no. 5 is 80k JSON.

The last entity (number 5) is also the biggest chunk and I get the impression it's printed just before the stream completes. And I'm pretty positive it's available after only 2-3 seconds of running.

Any idea why this stream just hangs after reading first 4 elements

val awesomeHttpReq = Http().singleRequest(
  HttpRequest(
    method = GET,
    uri = Uri("http://some-service-providing-endless-http.stream")
  )
)

val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
  case HttpResponse(status, _, entity, _) =>
    // I saw some comments the back pressure might kick in
    // because I might not be consuming the bytes here properly
    // but this is totally in line with all the examples etc.

    entity.withoutSizeLimit.getDataBytes.via(Framing delimiter (ByteString("\n"), Int.MaxValue))
} map { bytes =>
  parse(bytes decodeString StandardCharsets.UTF_8).fold(pf => throw new IllegalStateException(s"unable to parse: $pf"), identity[Json])
} mapConcat { items =>
  // every line that comes in from previous stage contains
  // key elements - this I'm interested in, it's an array
  items.asObject flatMap (_.toMap get "events") flatMap (_ asArray) getOrElse Nil
}

val b: Future[Vector[Json]] = a
  .takeWithin(50 second)
  .runWith(Sink.fold(Vector.empty[Json])((a, b) => {

    // I'm using this to see what's going on in the stream
    // there are significant pauses between the entities
    // in reality the elements are available in the stream (all 5)
    // within 2-3 seconds
    // and this printing just has very big pause after first 4 elements

    println(s"adding\n\n\n ${b.noSpaces}")
    a :+ b
  }))

Await.result(b, 1 minute)

I had a look at this issue it seems really close to what I have https://github.com/akka/akka-http/issues/57 but somehow fail to find something helpful for my case.

I also tried changing the chunk sizes for akka http, didn't really help.

here are the timings of incoming messages: from stream initialization:

1.  881 ms
2.  889 ms
3.  894 ms
4.  898 ms
// I don't understand why this wait time of 30 seconds in betweeen
5. 30871 ms

the last message obviously hangs somewhere for 30 seconds

Any ideas would really be appreciated.

Update:

Since it's really strange that the first 4 elements get out consistently at 4 and the 5th one is being waited on for 30 seconds, I decided to increase the initial-input-buffer-size = 4 from the default 4 to 16 and now it works as expected. I just simply fail to understand where the backpressure kicks in in the code above.

Update 2:

The buffer size helped with my simple example. But in my real problem I have something very strange going on:

entity.withoutSizeLimit.dataBytes
    .alsoTo(Sink.foreach(a => println("stage 1 " + a.decodeString(StandardCharsets.UTF_8))))
    .via(Framing delimiter (ByteString("\n"), Int.MaxValue))
    .buffer(1000, OverflowStrategy.backpressure)
    .alsoTo(Sink.foreach(a => println("stage 2 " + a.decodeString(StandardCharsets.UTF_8))))

I can see the message I need before the framing (stage 1) but not after it in the logs (stage 2). And I made sure that there is enough space to push by putting buffer after it.

Now I've found out that the new line character doesn't really come into the stage infront (stage 1), this is how every line usually ends:

"7da".sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toChar).mkString
res12: String =
"}
"

On my last item I'm missing the last byte a, basically the new line doesn't come into the framing. So the whole thing doesn't emit.

1
Interesting, I'm wondering if you could reproduce this without akka-http i.e., dump some of your source JSON to a file, and use Source.fromFile instead of the http request. - Frederic A.
When I just dump from curl it works. Also I tried now initial-input-buffer-size = 16 and it works as expected... this is really strange, looks like backpressure is there somewhere. But can't figure out where. - Marko Švaljek
Tried with file as a stream, used the same code as here. I don't run into this issue :( - driving me a little crazy right now :D - Marko Švaljek

1 Answers

1
votes

After quite some investigation I decided to go around the issue because it looks like there is a combination of multiple factors there. The input source for the whole question is actually proprietary enterprise service bus with kafka in background that my company uses: https://github.com/zalando/nakadi.

By the symptoms above I was thinking perhaps the system is not playing according to the docs and that they might not be sending the \n by appending but they prepped it to every line, but this is just not the case since I checked in the code: https://github.com/zalando/nakadi/blob/0859645b032d19f7baa919877f72cb076f1da867/src/main/java/org/zalando/nakadi/service/EventStreamWriterString.java#L36

After seeing this I tried to simulate the whole thing by using this example:

build.sbt

name := "test-framing"

version := "0.1"

scalaVersion := "2.12.4"    

lazy val akkaVersion = "2.5.6"
lazy val akkaHttpVersion = "10.0.10"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
)

scalacOptions in Compile ++= (scalacOptions in Compile).value :+ "-Yrangepos"

*TestApp.scala - where I had the problem in my code *

import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object TestApp extends App {

  implicit val system = ActorSystem("MyAkkaSystem")
  implicit val materializer = ActorMaterializer()

  val awesomeHttpReq = Http().singleRequest(
    HttpRequest(
      method = HttpMethods.GET,
      uri = Uri("http://localhost:9000/streaming-json")
    )
  )

  val a = Source.fromFuture(awesomeHttpReq).flatMapConcat {
    case HttpResponse(status, _, entity, _) =>
      entity.withoutSizeLimit.getDataBytes
        .via(Framing delimiter (ByteString("\n"), Int.MaxValue))
  } map { bytes =>
    bytes decodeString StandardCharsets.UTF_8
  }

  val b: Future[Vector[String]] = a
    .takeWithin(50 second)
    .runWith(Sink.fold(Vector.empty[String])((a, b) => {
      println(s"adding $b")
      a :+ b
    }))

  Await.result(b, 1 minute)

}

* The simulation endpoint *

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.util.ByteString
import spray.json._

import scala.concurrent.duration._
import scala.io.StdIn

object TestApp2 extends App {

  implicit val system = ActorSystem("MyAkkaSystem")
  implicit val materializer = ActorMaterializer()

  implicit val executionContext = system.dispatcher

  case class SomeData(name: String)

  trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
    implicit val someFormat = jsonFormat1(SomeData)
  }

  val start = ByteString.empty
  val sep = ByteString("\n")
  val end = ByteString.empty

  implicit val jsonStreamingSupport = EntityStreamingSupport
    .json()
    .withFramingRenderer(Flow[ByteString].intersperse(sep))

  object MyJsonService extends Directives with JsonSupport {

    def streamingJsonRoute =
      path("streaming-json") {
        get {
          val sourceOfNumbers = Source(1 to 1000000)

          val sourceOfDetailedMessages =
            sourceOfNumbers
              .map(num => SomeData(s"Hello $num"))
              .throttle(elements = 5,
                        per = 30 second,
                        maximumBurst = 6,
                        mode = ThrottleMode.Shaping)

          complete(sourceOfDetailedMessages)
        }
      }
  }

  val bindingFuture =
    Http().bindAndHandle(MyJsonService.streamingJsonRoute, "localhost", 9000)

  println(s"Server online at http://localhost:9000/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done

}

In the simulation endpoint I got the behavior as expected, so nothing really wrong with akka.

There still might be some issue when multiple libraries + nakadi are brought together but this is just goose hunting. In the end if I lower the batch_flush_timeout to some low value, the server is actually going to send the next event into the pipeline and thus the message that was last in the pipe will get pushed to my application layer so that I can do processing on it.

Basically all this text is because one single byte that somehow doesn't come into framing, but then again I learned a lot about akka streams in the past few days.