I'm migrating my old Casbah Mongo drivers to the new async Scala drivers and I'm trying to use this in an Akka stream, and the stream is getting stuck.
I have a GraphStage with createLogic() defined. The code is below. This worked fine with Casbah and I'd hoped the async nature of the new mongo drivers would be a great fit, but here what happens...
If I stream 2 records through this code, the first record flows through and triggers the next step. See output below ('HERE IN SEND' confirms it got through). The second record seems to go through the right steps in BlacklistFilter but Akka never flows to the SEND step.
Any ideas why this is not working with the new drivers?
object BlacklistFilter {
type FilterShape = FanOutShape2[QM[RenderedExpression], QM[RenderedExpression], QM[Unit]]
}
import BlacklistFilter._
case class BlacklistFilter(facilities: Facilities, helloConfig: HelloConfig)(implicit asys: ActorSystem) extends GraphStage[FilterShape] {
val outPass: Outlet[QM[RenderedExpression]] = Outlet("Pass")
val outFail: Outlet[QM[Unit]] = Outlet("Fail")
val reIn: Inlet[QM[RenderedExpression]] = Inlet("Command")
override val shape: FilterShape = new FanOutShape2(reIn, outPass, outFail)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(reIn)
setHandler(reIn, new InHandler {
override def onPush(): Unit = {
val cmd = grab(reIn)
val re: RenderedExpression = cmd.body
val check = re.recipient.contacts(re.media).toString
// NEW NON-BLOCKING CODE
//-------------------------------------
facilities.withMongo(helloConfig.msgDB, helloConfig.blacklistColl) { coll =>
var found: Option[Document] = None
coll.find(Document("_id" -> check)).first().subscribe(
(doc: Document) => {
found = Some(doc)
println("BLACKLIST FAIL! " + check)
emit(outFail, cmd)
// no pull() here as this happens on complete below
},
(e: Throwable) => {
// Log something here!
emit(outFail, cmd)
pull(reIn)
},
() => {
if (found.isEmpty) {
println("BLACKLIST OK. " + check)
emit(outPass, cmd)
}
pull(reIn)
println("Pulled reIn...")
}
)
}
// OLD BLOCKING CASBAH CODE THAT WORKED
//-------------------------------------
// await(facilities.mongoAccess().mongo(helloConfig.msgDB, helloConfig.blacklistColl)(_.findOne(MongoDBObject("_id" -> check)))) match {
// case Some(_) => emit(outFail, cmd)
// case None => emit(outPass, cmd)
// }
// pull(reIn)
}
override def onUpstreamFinish(): Unit = {} // necessary for some reason!
})
setHandler(outPass, eagerTerminateOutput)
setHandler(outFail, eagerTerminateOutput)
}
}
Output:
BLACKLIST OK. [email protected]
Pulled reIn...
HERE IN SEND (TemplateRenderedExpression)!!!
ACK!
BLACKLIST OK. 919-919-9119
Pulled reIn...
You can see from the output that the first record flowed nicely to the SEND/ACK steps. The second record printed the BLACKLIST message, meaning it emitted outPass then called pull on reIn... but then nothing happens downstream.
Anyone know why this would work differently in Akka Streams than the Casbah version that worked fine (code shown commented out)?
(I could just convert the Mongo call to a Future and Await on it, and that should work like the old code, but that kinda defeats the whole point of going async!)