I have a flow that relies on API responses. When the responses do not conform to what I expect an exception is thrown. This strategy works nicely for Spray and for direct method testing with specs2.
However, when I try to use streams with exception throwing modules the flow simply halts.
This is my flow:
Source(() => file)
.via(csvToSeq)
.via(getFromElastic)
.via(futureExtrtactor)
.via(findLocaionOfId)
.foreach(v => v.map(v => println("foreached", v)))
.onComplete(_ => system.shutdown())
My strategy for this is using map
for futures.
like so:
val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
js.asJsObject("Couldn't convert").getFields("externalId").map({
case JsString(str) => {
(i + 1, i == 0, js)
}
else (i, false, js)
}
case _ => (i, false, x)
})
})
}
}))
This is a potential exception thrower in a completely different location:
val encoded_url = URLEncoder.encode(url, "UTF-8")
Seems like I am missing something but can't see what. Thanks for any pointers.
onComplete
you are disregarding the value of theTry
that is supplied by using_
. You should instead try matching onSuccess
andFailure
and seeing which you get. If a fail you should have the stack. – cmbaxter