Akka-http documentation says:
Apart from regarding a socket bound on the server-side as a Source[IncomingConnection] and each connection as a Source[HttpRequest] with a Sink[HttpResponse]
Assume we get the merged source containing incoming connections from many Source[IncomingConnection].
Then, assume, we get Source[HttpRequest] from Source[IncomingConnection] (see the code below).
Then, no problem, we can provide a flow to convert HttpRequest to HttpResponse.
And here is the problem - how can we properly Sink the responses ? How can we join the responses to connections?
The whole idea behind the use case is the possibility to prioritize the processing of incoming requests from different connections. Should be useful in many cases I guess...
Thanks in advance!
Edit: Solution based on the answer from @RamonJRomeroyVigil:
Server code:
val in1 = Http().bind(interface = "localhost", port = 8200)
val in2 = Http().bind(interface = "localhost", port = 8201)
val connSrc = Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val merge = b.add(Merge[IncomingConnection](2))
in1 ~> print("in1") ~> merge.in(0)
in2 ~> print("in2") ~> merge.in(1)
SourceShape(merge.out)
})
val reqSrc : Source[(HttpRequest, IncomingConnection), _] =
connSrc.flatMapConcat { conn =>
Source.empty[HttpResponse]
.via(conn.flow)
.map(request => (request, conn))
}
val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] =
Flow[(HttpRequest, IncomingConnection)].map{
case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) =>
println(s"${System.currentTimeMillis()}: " +
s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}")
(HttpResponse(entity = "pong"), conn)
}
reqSrc.via(flow).to(Sink.foreach { case (resp, conn) =>
Source.single(resp).via(conn.flow).runWith(Sink.ignore)
}).run()
def print(prefix: String) = Flow[IncomingConnection].map { s =>
println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s
}
So, I am using curl from console and see the following:
% curl http://localhost:8200/ping
curl: (52) Empty reply from server
Second curl request fails:
% curl http://localhost:8200/ping
curl: (7) Failed to connect to localhost port 8200: Connection refused
On the server console I see the following when sending 1st request:
in1 [ 1450287301512 ]: /127.0.0.1:52461
1450287301626: process request from localhost:52461
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
And nothing when sending 2nd request.
So, it looks like there is some problem with the internal connection flow (as stated @RamonJRomeroyVigil) or with something else...
Basically the code does not work.
Still investigating the problem.
Source[IncomingConnection]
into aSource[HttpRequest]
how would you send yourHttpResponse
back to the client? Once you've generated your response in your stream pipeline you no longer have a handle to the IncomingConnection... – Ramón J Romero y Vigil