Let's imagine proxy application based on akka-streams and akka-http which takes (as TCP server) messages in some home-grown format, makes http requests from them, asks some other http server, converts http response back to home-grown format and replies back to the client. Simpified code below:
// as Client part
val connPool = Http().cachedHostConnectionPool[CustHttpReq](someHost, somePort)
val asClientFlow = Flow[CustHttpReq]
.via (connPool)
.map (procHttpResp)
def procHttpResp (p: (Try[HttpResponse], CustHttpReq)): Future[ByteString] = {
val (rsp, src) = p
rsp match {
case Success(response: HttpResponse) =>
for (buf <- cvtToHomeGrown (response, src))
yield buf
case Failure(ex) => ...
}
}
def cvtToHomeGrown (rsp: HttpResponse): Future[ByteString] = {
rsp.entity.dataBytes.runWith (Sink.fold (ByteString.empty)(_ ++ _))
.map (cvtToHomeGrownActually) // has signature String => ByteString
}
// as Server part
val parseAndAskFlow = Flow[ByteString]
.via(Framing.delimiter(
ByteString('\n'))
.map (buf => cvtToCustHttpReq (buf))
.via (asClientFlow) // plug-in asClient part, the problem is here
val asServerConn: Source[IncomingConnection, Future[ServerBinding]] = Tcp().bind("localhost",port)
asServerConn.runForeach (conn => conn.handleWith(parseAndAskFlow)
The problem is that conn.handleWith requires Flow[ByteString,ByteString,], but http client code (rsp.entity.dataBytes...) returns Future[ByteSring], so parseAndAskFlow has Flow[ByteString,Future[ByteString],] type and I have no idea where to complete it better. I even guess it's not a good idea at all as far as all of these are streams and Await somethere will stop nice async processing, but code is not compiled.