0
votes

I am trying to test the throughput of Akka Streams and see how well it scales as the amount of requests increases.

The problem I'm currently facing is that the stream isn't working concurrent. My stream consists of flows that each sleep for a second to simulate functionality. What happens is that for each element passed through the stream, the flow will deal with it synchronously. I want this too happen asynchronous to optimize my performance.

This is the code I'm using:

// Flow that's being used
def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] =
    Flow[TestObject].map { s ⇒
      println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
      Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
      println(name + " finished processing " + s)
      s
    }

// Stream
def startStream() = {
        val completion = Source[TestObject](list.toList)
          .via(processingStage("A")).async
          .via(processingStage("B")).async
          .runWith(Sink.foreach(s ⇒ println("Got output " + s)))
  }
3
Too bad people don't post self-contained example we could run. But I'd try adding .async at the end of stage flow. Also try using mapAsync(4) instead.expert
They always tell you not to use Thread.sleep in an actor. Create a new actor and schedule a task. Next, use mapAsyncUnordered if you do not care about order as it will not create the overhead associated with mapAsync and as stated create async boundaries (split tasks to different actors for possibly concurrent processing). CheersAndrew Scott Evans

3 Answers

0
votes

Don't use Thread.sleep(1000) to simulate delays—use the time-based combinators. Also, if you want to force concurrency of multiple stages within the same stream, use the .async demarcation. See the documentation for more details.

0
votes

Streams are serial by default. If you want elements in the stream to be processed in parallel, you must request it directly.

The docs describe one method to achieve this: http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers

Here's it added to your code:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, FlowShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Sink, Source}

object q40545440 {

  def main(args: Array[String]): Unit = {

    implicit val sys = ActorSystem()
    implicit val mat = ActorMaterializer()

    case class TestObject(x: String)

    // Flow that's being used
    def processingStage(name: String): Flow[TestObject, TestObject, NotUsed] =
    Flow[TestObject].map { s ⇒
      println(name + " started processing " + s + " on thread " + Thread.currentThread().getName)
      Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
      println(name + " finished processing " + s)
      s
    }

    // Stream to a parallel processing pool of workers
    // See http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Balancing_jobs_to_a_fixed_pool_of_workers
    def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
      import GraphDSL.Implicits._

      Flow.fromGraph(GraphDSL.create() { implicit b =>
        val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
        val merge = b.add(Merge[Out](workerCount))

        for (_ <- 1 to workerCount) {
          // for each worker, add an edge from the balancer to the worker, then wire
          // it to the merge element
          balancer ~> worker.async ~> merge
        }

        FlowShape(balancer.in, merge.out)
      })
    }

    def startStream(list: List[TestObject]) = {
      val completion = Source[TestObject](list)
        .via(balancer(processingStage("A"), 5))
        .via(balancer(processingStage("B"), 5))
        .runWith(Sink.foreach(s ⇒ println("Got output " + s)))
    }

    startStream(List(
      TestObject("a"),
      TestObject("b"),
      TestObject("c"),
      TestObject("d")))

    sys.terminate()
  }
}
0
votes

Here is a little bit updated version of your code:

import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global

object Test extends App {    

  // Flow that's being used
  def processingStage(name: String): Future[String] = Future {
    println(name + " started processing " + name + " on thread " + Thread.currentThread().getName)
    Thread.sleep(1000) // Simulate long processing *don't sleep in your real code!*
    println(name + " finished processing " + name)
    name
  }

  // Stream
  def startStream() = {
    val parallelism = 10 //max number of parallel instances
    val list = (1 to 1000000).toList.map(_.toString) //sample input 

    Source[String](list)
      .mapAsync(parallelism)(processingStage) //handles the items concurrently 
      .runWith(Sink.foreach(s ⇒ println("Got output " + s)))
  }

  startStream()
}
  • The first point is, you should convert your processingStage function into a method that returns a Future. By doing this, you can better simulate concurrent tasks.

  • Secondly, you should use mapAsync method to support concurrency in stages. As I understand, it is the exact feature you are looking for.