0
votes

I am trying to create a stream which polls a rest service and unmarshals the json object.

I have created a source.tick which does a http request every 5 seconds. If this succeeds the HttpResponse will contain an OK. If not the service is unavailable. The result will be send to an actor. See the following code:

def poll(pollActor: ActorRef) {

    val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))

    val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
      case HttpResponse(StatusCodes.OK, _, entity, _) =>
        Unmarshal(entity).to[Item]
      case resp @ HttpResponse(code, _, _, _) =>
        log.warning("Request failed, response code: " + code)
        Future.failed(new Exception)
    }

    source.via(flow).runWith(Sink.actorRef[Equals](pollActor,akka.actor.Status.Success(())))
}

An actor will receive the result from the stream as can be seen in the following code:

def receive = {
    case k : Item => println(k)
    case f : Failure => {
      println("We failed: " + f)
    }
}

Where and how should I handle the exception thrown by the future?

2

2 Answers

1
votes

One way of approaching this is to make failures an explicit part of your stream.

    val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
      case HttpResponse(StatusCodes.OK, _, entity, _) =>
        Unmarshal(entity).to[Item].map(Right(_))
      case resp @ HttpResponse(code, _, _, _) =>
        Future.successful(Left(MyDomainFailure("some meaningful message/data")))
    }

Note that now the type of your flow is

Flow[HttpRequest, Either[MyDomainFailure, Item], Future[OutgoingConnection]] 

This has the added value of clarity, making downstream stages aware of the failure and forcing them to handle it (well, in this case not really, because you're using an actor. If you stay within the realm of streams, you will be forced to handle them).

    def receive = {
      case Right(item) => println(item)
      case Left(failure) => {
        println("We failed: " + failure.msg)
      }
    }
0
votes

This was the fix I used, while it does not produce an exception, the Failure contained in the HttpResponse is simply matched in the receive function.

def poll(pollActor: ActorRef) {

  val source = Source.tick(0.seconds, 3.seconds, HttpRequest(uri = Uri(path = Path("/posts/1"))))

  val flow = Http().outgoingConnectionHttps("jsonplaceholder1.typicode.com").mapAsync(1) {
    // Where able to reach the API.
    case HttpResponse(StatusCodes.OK, _, entity, _) =>
      // Unmarshal the json response.
      Unmarshal(entity).to[Item]

    // Failed to reach the API.
    case HttpResponse(code, _, _, _) =>
      Future.successful(code)
  }

    source.via(flow).runWith(Sink.actorRef[Any](pollActor,akka.actor.Status.Success(())))
}

Here we match the Failure produced by the HttpResponse.

def receive = {
  case item: Item => println(item)
  case failure: Failure => {
    log.warning("Request failed, response code: " + failure)
  }
}