I am trying to create a stream which polls a rest service and unmarshals the json object.
I have created a source.tick which does a http request every 5 seconds. If this succeeds the HttpResponse will contain an OK. If not the service is unavailable. The result will be send to an actor. See the following code:
def poll(pollActor: ActorRef) {
val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))
val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
case HttpResponse(StatusCodes.OK, _, entity, _) =>
Unmarshal(entity).to[Item]
case resp @ HttpResponse(code, _, _, _) =>
log.warning("Request failed, response code: " + code)
Future.failed(new Exception)
}
source.via(flow).runWith(Sink.actorRef[Equals](pollActor,akka.actor.Status.Success(())))
}
An actor will receive the result from the stream as can be seen in the following code:
def receive = {
case k : Item => println(k)
case f : Failure => {
println("We failed: " + f)
}
}
Where and how should I handle the exception thrown by the future?