4
votes

Akka-http documentation says:

Apart from regarding a socket bound on the server-side as a Source[IncomingConnection] and each connection as a Source[HttpRequest] with a Sink[HttpResponse]

Assume we get the merged source containing incoming connections from many Source[IncomingConnection].

Then, assume, we get Source[HttpRequest] from Source[IncomingConnection] (see the code below).

Then, no problem, we can provide a flow to convert HttpRequest to HttpResponse.

And here is the problem - how can we properly Sink the responses ? How can we join the responses to connections?

The whole idea behind the use case is the possibility to prioritize the processing of incoming requests from different connections. Should be useful in many cases I guess...

Thanks in advance!

Edit: Solution based on the answer from @RamonJRomeroyVigil:

Server code:

val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)

val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
  import FlowGraph.Implicits._

  val merge = b.add(Merge[IncomingConnection](2))

  in1 ~> print("in1") ~> merge.in(0)
  in2 ~> print("in2") ~> merge.in(1)

  SourceShape(merge.out)
})

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
      .via(conn.flow)
      .map(request => (request, conn))
  }

val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
  Flow[(HttpRequest, IncomingConnection)].map{
      case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
        println(s"${System.currentTimeMillis()}: " +
          s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
        (HttpResponse(entity = "pong"), conn)
    }

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()

def print(prefix: String) = Flow[IncomingConnection].map { s =>
  println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}

So, I am using curl from console and see the following:

% curl http://localhost:8200/ping
curl: (52) Empty reply from server

Second curl request fails:

% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused

On the server console I see the following when sending 1st request:

in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

And nothing when sending 2nd request.

So, it looks like there is some problem with the internal connection flow (as stated @RamonJRomeroyVigil) or with something else...

Basically the code does not work.

Still investigating the problem.

1
If you convert your Source[IncomingConnection] into a Source[HttpRequest] how would you send your HttpResponse back to the client? Once you've generated your response in your stream pipeline you no longer have a handle to the IncomingConnection...Ramón J Romero y Vigil
@RamonJRomeroyVigil that's the question... Looking to the API I can't find the answer, but thinking logically - this should not be a problem. For example: 1. we have two bound connections: A and B 2. both connections merged to C 3. C produces HttpRequest instances 4. some processing flow converts the HttpRequest to HttpResponse 5. finallly somehow we should route the HttpResponse to the correct connection - looks pretty logical and acceptable.. So, maybe it is possible to use some wrapper of HttpRequest which can include IncomingConnection instance so we can use it later?d-n-ust
I wrote up an answer with a caveat. I recommend that you separate Streams for the different priorities. Maybe open two different ports for the two different priority levels and then use different size thread pools for each of the ports.Ramón J Romero y Vigil
@RamonJRomeroyVigil, I have one resource that I need to call based on requests, so basically I need one pool of workers. I think with some sort of managment on top of that pool of workers it is feasible to load balance the requests based on the incoming requests rates but I need back-pressure enabled. Implementing back-pressure without streams does not look easy and the right thing... but maybe I just didn't get your idead-n-ust

1 Answers

0
votes

The below solution is based on further information provided in the question comments.

Given

val connSrc : Source[IncomingConnection,_] = ???

The flatMapConcat method solves the specific question stated:

val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
  connSrc.flatMapConcat { conn =>
    Source.empty[HttpResponse]
          .via(conn.flow)
          .map(request => (request, conn))
  }

This provides a Source of (HttpRequest, IncomingConnection) tuples.

Assuming you have a processing step that converts requests to respones

val flow : Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] = ???

You can send a response back to the client:

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
  Source.single(resp).via(conn.flow).runWith(Sink.ignore)
})

Warning: This solution calls conn.flow twice: once to create a flow that generates requests and again to create a flow to send responses to. I do not know if this type of use-case will break something within the IncomingConnection logic.