4
votes

I have a flow that relies on API responses. When the responses do not conform to what I expect an exception is thrown. This strategy works nicely for Spray and for direct method testing with specs2.

However, when I try to use streams with exception throwing modules the flow simply halts.

This is my flow:

 Source(() => file)
      .via(csvToSeq)
      .via(getFromElastic)
      .via(futureExtrtactor)
      .via(findLocaionOfId)
      .foreach(v => v.map(v => println("foreached", v)))
      .onComplete(_ => system.shutdown())

My strategy for this is using map for futures.

like so:

 val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].map(future => future.map(jsSeq => {
      jsSeq.zipWithIndex.flatMap { case (x, i) => x._2.asJsObject.getFields("_source").flatMap(js => {
        js.asJsObject("Couldn't convert").getFields("externalId").map({
          case JsString(str) => {
              (i + 1, i == 0, js)
            }
            else (i, false, js)
          }
          case _ => (i, false, x)
        })
      })
      }
    }))

This is a potential exception thrower in a completely different location:

val encoded_url = URLEncoder.encode(url, "UTF-8")

Seems like I am missing something but can't see what. Thanks for any pointers.

2
I think you need to be a bit more specific here. What is the behavior that is currently happening and what would you prefer instead? If a transform step is failing and that's stopping the rest of the flow, what would you prefer instead? Providing that kind of specifics will help people craft a better answer for you.cmbaxter
In your onComplete you are disregarding the value of the Try that is supplied by using _. You should instead try matching on Success and Failure and seeing which you get. If a fail you should have the stack.cmbaxter
The program never terminates so oncomplete doesn't fireraam86

2 Answers

3
votes

This sounds like an issue that will be addressed once Supervision for Akka Streams is implemented. Akka Streams are still "pre-experimental" so that feature has not been implemented yet, but is definitely planned to be included soon.

// As of writing this comment current version is 1.0-M2 (preview milestone).

3
votes

The thing i was missing was mapAsync.

changing the above function to:

val findLocaionOfId = Flow[Future[Seq[(String, JsValue)]]].mapAsync({...})

This way the futures are unwrapped and exceptions stop the program as expected.