1
votes

I'm starting to learn Akka Streams and am running the first example from here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-rate.html#stream-rate-scala

import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

object Main extends App {

  implicit val system = ActorSystem("TestSystem")
  implicit val mat = ActorMaterializer()

  Source(1 to 10)
  .map { i => println(s"A: $i"); i }
  .map { i => println(s"B: $i"); i }
  .map { i => println(s"C: $i"); i }
  .runWith(Sink.ignore)

}

Based on the example, the output should be non-deterministic like this:

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

But when I run it, it never starts processing the next element until the previous is fully processed.

A: 1
B: 1
C: 1
A: 2
B: 2
C: 2
A: 3
B: 3
C: 3
A: 4
B: 4
C: 4
A: 5
B: 5
C: 5
A: 6
B: 6
C: 6
A: 7
B: 7
C: 7
A: 8
B: 8
C: 8
A: 9
B: 9
C: 9
A: 10
B: 10
C: 10

I also tried adding a delay into each stage/map (with Thread.sleep), and only one thing was processed at a time, as if the actor system only has one thread. I was able to confirm that the Akka dispatcher has enough threads.

  import system.dispatcher
  val start = System.currentTimeMillis()

  ( 1 to 10 ).map { i => Future { Thread.sleep( 1000 ); println( s"Finished ${i} after ${System.currentTimeMillis() - start}ms" ) }

Output:

Finished 5 after 1004ms
Finished 2 after 1004ms
Finished 6 after 1005ms
Finished 8 after 1004ms
Finished 4 after 1004ms
Finished 9 after 1005ms
Finished 7 after 1004ms
Finished 3 after 1005ms
Finished 1 after 1006ms
Finished 10 after 1009ms

Does something need to be tweaked to get the stages to process concurrently?

I'm using Akka Streams 2.4.2 and Java 1.8.0_65-b17.

1

1 Answers

5
votes

I think, that you're observing an Operator Fusion, which means that all three map operations are being executed on the same Actor.

The fusable elements are for example:

  • all GraphStages (this includes all built-in junctions apart from groupBy)
  • all Stages (this includes all built-in linear operators)
  • TCP connections

You can either disable it either by passing configuration parameter akka.stream.materializer.auto-fusing=off which disables it completely (I'm not sure its a good idea in general) or you can disable it in your code by adding async boundaries (see the link I've attached for more details). Everything in a single async boundary gets executed on a single actor.

Source(1 to 10)
  .map { i => println(s"A: $i"); i }.async
  .map { i => println(s"B: $i"); i }.async
  .map { i => println(s"C: $i"); i }.async
  .runWith(Sink.ignore)