0
votes

I'm new to akka-stream, so want to ask how to reproduce behavior presented in this article http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0.2/scala/stream-rate.html

For given code

Source(1 to 3)
  .map { i => println(s"A: $i"); i }
  .map { i => println(s"B: $i"); i }
  .map { i => println(s"C: $i"); i }
  .runWith(Sink.ignore)

Get such similar

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

I've tried add some random Thread.sleep, create a stream from an infinite iterator. But Akka accordingly to the debug output always uses the same thread for processing.

So the question is: How to reproduce async behavior (every stage should run in an async way) using akka-stream?

1
What you mean under async? What is expected behavior for you? - 1esha
something, exept this A: 1 B: 1 C: 1 A: 2 B: 2 C: 2 A: 3 B: 3 C: 3 - alatom
Then do all in one map step? - 1esha

1 Answers

2
votes

The reason you are seeing sequential operations is because all of your operations are off of the same Source, and therefore within the same asynchronous boundary. To get the "async behavior" you are looking for you need to add Flows:

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()

Source(1 to 3).via(Flow[Int].map{i => println(s"A: $i"); i })
              .via(Flow[Int].map{i => println(s"B: $i"); i })
              .via(Flow[Int].map{i => println(s"C: $i"); i })
              .runWith(Sink.ignore)

Each Flow will materialize into a seperate Actor. Note: to get true concurrency the thread pool that the ActorSystem is operating on must have more than 1 thread.

One thing to remember: the benefit of the ActorSystem is that it assumes responsibility for low level control of operations so that the developer can focus on the "business logic". This can also be a drawback. Based on your ActorSystem configuration, JVM configuration, and hardware configuration the order of operations may still be synchronous.