0
votes

I wrote a simple actor that downloads a web page and sends the body of this page to its sender. I use Akka HTTP for building up the HTTP request and handling the HTTP response. Here is my code:

class Downloader(uri: String) extends Actor {

  import akka.pattern.pipe
  import context.dispatcher

  final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))

  val http = Http(context.system)
  http.singleRequest(HttpRequest(uri = uri)) pipeTo self
  println(s"SENDING request to $uri")

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      println(s"HttpResponse: SUCCESS")
      val body = entity.dataBytes.runFold(ByteString(""))(_ ++ _) map (bytes => bytes.decodeString(ByteString.UTF_8)) foreach println
      sender() ! body
      context.stop(self)

    case HttpResponse(code, _, _, _) =>
      println(s"HttpResponse: FAILURE")
      context.stop(self)
  }
}

In the main program I create 10 actors that start downloading their web page in the constructor.

val system = ActorSystem("akkaHttpClient")
for (i <- 1 to 10)
  system.actorOf(Props(classOf[Downloader], "http://akka.io"), s"downloader-$i")

Thread.sleep(20000)

val termFuture = system.terminate()
Await.ready(termFuture, Duration.Inf)

Unfortunately, only 4 of the 10 actors created get a response:

SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
SENDING request to http://akka.io
HttpResponse: SUCCESS
HttpResponse: SUCCESS
HttpResponse: SUCCESS
HttpResponse: SUCCESS

Whats wrong? Did I forget to release some resources?

And is this the right approach to download multiple web pages concurrently with Akka HTTP?

1

1 Answers

5
votes

You are killing the actor without actually waiting for the completion of the response folding. The below should work better:

  def receive = {
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      println(s"HttpResponse: SUCCESS")
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) map (bytes => bytes.decodeString(ByteString.UTF_8)) foreach { s =>
        println(s)
        context.stop(self)
      }

    case HttpResponse(code, _, _, _) =>
      println(s"HttpResponse: FAILURE")
      context.stop(self)
  }

Why 4 requests? 4 is the maximum number of connections that your underlying client connection pool can establish, as per the reference configuration.