5
votes

Within an akka stream stage FlowShape[A, B] , part of the processing I need to do on the A's is to save/query a datastore with a query built with A data. But that datastore driver query gives me a future, and I am not sure how best to deal with it (my main question here).

case class Obj(a: String, b: Int, c: String)
case class Foo(myobject: Obj, name: String)
case class Bar(st: String)
//
class SaveAndGetId extends GraphStage[FlowShape[Foo, Bar]] {
 val dao = new DbDao // some dao with an async driver 

 override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
  setHandlers(in, out, new InHandler with Outhandler {
   override def onPush() = {
    val foo = grab(in)
    val add = foo.record.value()
    val result: Future[String] = dao.saveAndGetRecord(add.myobject)//saves and returns id as string

   //the naive approach
    val record = Await(result, Duration.inf)
    push(out, Bar(record))// ***tests pass every time

  //mapping the future approach
    result.map { x=>
     push(out, Bar(x))
    } //***tests fail every time

The next stage depends on the id of the db record returned from query, but I want to avoid Await. I am not sure why mapping approach fails:

"it should work" in {
  val source = Source.single(Foo(Obj("hello", 1, "world")))
  val probe = source
    .via(new SaveAndGetId))
    .runWith(TestSink.probe)
  probe
   .request(1)
   .expectBarwithId("one")//say we know this will be
   .expectComplete()
 }
 private implicit class RichTestProbe(probe: Probe[Bar]) {
  def expectBarwithId(expected: String): Probe[Bar] = 
   probe.expectNextChainingPF{
    case r @ Bar(str) if str == expected => r
  }
 }

When run with mapping future, I get failure:

should work ***FAILED***
java.lang.AssertionError: assertion failed: expected: message matching partial function but got unexpected message OnComplete
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:406)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.TestSubscriber$ManualProbe.expectEventPF(StreamTestKit.scala:570)

The async side channels example in the docs has the future in the constructor of the stage, as opposed to building the future within the stage, so doesn't seem to apply to my case.

2
You can use getAsyncCallackcchantep

2 Answers

8
votes

I agree with Ramon. Constructing a new FlowShapeis not necessary in this case and it is too complicated. It is very much convenient to use mapAsync method here:

Here is a code snippet to utilize mapAsync:

import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object MapAsyncExample {

  val numOfParallelism: Int = 10

  def main(args: Array[String]): Unit = {
    Source.repeat(5)  
      .mapAsync(parallelism)(x => asyncSquare(x))           
      .runWith(Sink.foreach(println)) previous stage
  }

  //This method returns a Future
  //You can replace this part with your database operations
  def asyncSquare(value: Int): Future[Int] = Future {
    value * value
  }
}

In the snippet above, Source.repeat(5) is a dummy source to emit 5 indefinitely. There is a sample function asyncSquare which takes an integer and calculates its square in a Future. .mapAsync(parallelism)(x => asyncSquare(x)) line uses that function and emits the output of Future to the next stage. In this snipet, the next stage is a sink which prints every item.

parallelism is the maximum number of asyncSquare calls that can run concurrently.

3
votes

I think your GraphStage is unnecessarily overcomplicated. The below Flow performs the same actions without the need to write a custom stage:

val dao = new DbDao

val parallelism = 10 //number of parallel db queries

val SaveAndGetId : Flow[Foo, Bar, _] = 
  Flow[Foo]
    .map(foo => foo.record.value().myobject)
    .mapAsync(parallelism)(rec => dao.saveAndGetRecord(rec))
    .map(Bar.apply)

I generally try to treat GraphStage as a last resort, there is almost always an idiomatic way of getting the same Flow by using the methods provided by the akka-stream library.