Within an akka stream stage FlowShape[A, B]
, part of the processing I need to do on the A's is to save/query a datastore with a query built with A data. But that datastore driver query gives me a future, and I am not sure how best to deal with it (my main question here).
case class Obj(a: String, b: Int, c: String)
case class Foo(myobject: Obj, name: String)
case class Bar(st: String)
//
class SaveAndGetId extends GraphStage[FlowShape[Foo, Bar]] {
val dao = new DbDao // some dao with an async driver
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with Outhandler {
override def onPush() = {
val foo = grab(in)
val add = foo.record.value()
val result: Future[String] = dao.saveAndGetRecord(add.myobject)//saves and returns id as string
//the naive approach
val record = Await(result, Duration.inf)
push(out, Bar(record))// ***tests pass every time
//mapping the future approach
result.map { x=>
push(out, Bar(x))
} //***tests fail every time
The next stage depends on the id of the db record returned from query, but I want to avoid Await
. I am not sure why mapping approach fails:
"it should work" in {
val source = Source.single(Foo(Obj("hello", 1, "world")))
val probe = source
.via(new SaveAndGetId))
.runWith(TestSink.probe)
probe
.request(1)
.expectBarwithId("one")//say we know this will be
.expectComplete()
}
private implicit class RichTestProbe(probe: Probe[Bar]) {
def expectBarwithId(expected: String): Probe[Bar] =
probe.expectNextChainingPF{
case r @ Bar(str) if str == expected => r
}
}
When run with mapping future, I get failure:
should work ***FAILED***
java.lang.AssertionError: assertion failed: expected: message matching partial function but got unexpected message OnComplete
at scala.Predef$.assert(Predef.scala:170)
at akka.testkit.TestKitBase$class.expectMsgPF(TestKit.scala:406)
at akka.testkit.TestKit.expectMsgPF(TestKit.scala:814)
at akka.stream.testkit.TestSubscriber$ManualProbe.expectEventPF(StreamTestKit.scala:570)
The async side channels example in the docs has the future in the constructor of the stage, as opposed to building the future within the stage, so doesn't seem to apply to my case.
getAsyncCallack
– cchantep