I am trying to test the throughput of Akka Streams
and see how well it scales as the amount of requests increases.
The problem I'm currently facing is that the stream
isn't working concurrent. My stream
consists of flows
that each sleep for a second to simulate functionality. What happens is that for each element passed through the stream, the flow will deal with it synchronously. I want this too happen asynchronous to optimize my performance.
This is the code I'm using:
// Flow that's being used
def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] =
Flow[TestObject].map { s ⇒
println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
println(name + " finished processing " + s)
s
}
// Stream
def startStream() = {
val completion = Source[TestObject](list.toList)
.via(processingStage("A")).async
.via(processingStage("B")).async
.runWith(Sink.foreach(s ⇒ println("Got output " + s)))
}
.async
at the end of stage flow. Also try usingmapAsync(4)
instead. – expert