0
votes

Akka stream pipeline works fine without take(2) but with take(2) does not display any output. How does take() function make a difference?

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits._

object TestStream {
    implicit val system = ActorSystem("MyTest")
    implicit val Matt = ActorMaterializer()

    def mainflow():Unit ={
        val grp = RunnableGraph.fromGraph(GraphDSL.create() {
        implicit builder =>
        import GraphDSL.Implicits._
        val src:Outlet[Int] = builder.add(Source(1 to 10)).out
        val flw1:FlowShape[Int, Int] = builder.add(Flow[Int].map(x => x*10).take(2))
        val flw2:FlowShape[Int, Int] = builder.add(Flow[Int].filter(x => x > 50))
        val snk2:Inlet[Any] = builder.add(Sink.foreach(println)).in
        src ~> flw1 ~> flw2 ~> snk2
        ClosedShape
        }).run
    }
    def main(str:Array[String]):Unit = {
        mainflow()
    }
}

Just for reference added the Test result without take(): spark\bin\spark-submit --class TestStream --master local[2] --jars config-1.2.1.jar,akka-actor_2.11-2.4.2.jar,reactive-s treams-1.0.0.jar,akka-stream_2.11-2.4.2.jar target\scala-2.11\simple-project_2.1 1-1.0.jar 60 70 80 90 100

1
What did you do to troubleshoot this? Some intermediate logging of values would seem to be indicated.The Archetypal Paul

1 Answers

2
votes

The graph is working as it should. Think about what's happening here:

  1. you generate a lazy source of ints: [1, 2, 3, ... 10]
  2. you multiple them by 10, so your stream is [10, 20, 30 ... 100]
  3. you take the first two elements and complete the stream, ignoring the rest: [10, 20]
  4. you filter the stream for those greater than 50, which excludes 10 and 20: []

Therefore you print nothing.

If you remove the take(2) the full set of 10 elements will reach the sink and it'll print:

60
70
80
90
100

It's really just the same as running this in the Scala console (but lazier):

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println)