3
votes

I have the following stream that works pretty well:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

and a simple actor to handle the response status and the final message when the stream completes:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

I don't know if this is the best approach to count things and also to handle response status, but it's working so far.

The question is how to pass the original request to the actor in a way I could know what request caused a specific error.

My actor could expect the following instead:

case (request: String, response: HttpResponse) =>

But how to pass that information that I have at the beginning of my pipeline?

I was thinking of to map like this:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

But I have no idea on how to fire the Http flow.

Any suggestion?

1
Try using a host connection pool instead of explicitly opening an outgoing connection per request. That model requires an arbitrary identifier per request that is then returned on the response so you can properly correlate request and responsecmbaxter
Hi @cmbaxter you mean to use this example? doc.akka.io/docs/akka/2.4.4/scala/http/client-side/… but with a String instead?Thiago Pereira

1 Answers

1
votes

With @cmbaxter help, I could solve my problem using the following piece of code:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

Now my actor is able to receive this:

case (respTry: Try[HttpResponse], request: String) =>