0
votes

I´m pretty new in Akka stream, I´ve been working with Rx for a while so I know pretty good all operators, but I´m not able to know why my pipeline does not emit the values

Here my code

  @Test def mainFlow(): Unit = {
    val increase = Flow[Int]
      .map(value => value * 10)
    val filterFlow = Flow[Int]
      .filter(value => value > 50)
      .take(2)
    Source(0 to 10)
      .via(increase)
      .via(filterFlow)
      .to(Sink.foreach(value => println(s"Item emitted:$value")))
      .run()
  }

The first Flow transform the values emitted in the Source multiplying by 10, and the second flow filter by get only the items higher than 50 and then I just get 2, so I was expecting to have in the Sink 60 and 70 But nothing it´s emitted.

Any idea why?

2

2 Answers

3
votes

Your flow is correctly built, and emits those 2 elements you mentioned. I believe the problem is with your test. Namely, the flow runs asynchronously and your test is a plain Unit procedure. Therefore, the test will not wait until the flow is run.

You will need to introduce some synchronization in your test to perform your assertions. One way to do it is to use the ScalaFutures trait from ScalaTest, which offers you a futureValue method.

val increase = Flow[Int]
  .map(value => value * 10)
val filterFlow = Flow[Int]
  .filter(value => value > 50)
  .take(2)
Source(0 to 10)
  .via(increase)
  .via(filterFlow)
  .runForeach(value => println(s"Item emitted:$value"))
  .futureValue

Note that .to(Sink.foreach{...}).run() does not expose the Future[Done] you need to synchronize on. Your code needs to change to .toMat(Sink.foreach{...})(Keep.right).run(), which can be abbreviated to .runForeach(...).

1
votes

Because what you are saying is the following:

For the numbers 1..10 multiply them by 10 but only ever produce the 2 first elements then keep all of those elements which are greater than 50 and then print them.

Additionally, your test does not wait for the completion of the RunnableFlow, which typically means that your program will exit before the stream has a chance to run (Akka Streams run asynchronously).

Note that for your example there's no reason to use the GraphDSL, your code is identical to:

Source(1 to 10).map(_ * 10).take(2).filter(_ > 50).runForeach(println)

But since it isn't really doing anything "meaningfully async" I think you'd be much better off with:

(1 to 10).map(_ * 10).take(2).filter(_ > 50).foreach(println)

But then again, with the current state of the code, it is equivalent to the following expression:

()