I'm trying to create a simple Proxy for Websocket connections using Play and akka streams. The traffic flow is like this:
(Client) request -> -> request (Server)
Proxy
(Client) response <- <- response (Server)
I came up with the following code after following some examples:
def socket = WebSocket.accept[String, String] { request =>
val uuid = UUID.randomUUID().toString
// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
source.toMat(sink)(Keep.both).run()
}
// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println("The server has sent: " + message.text)
}
// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
Flow.fromSinkAndSource(sink, source)
}
finalFlow
With this code, the traffic goes from the Client to the Proxy to the Server, back to the Proxy and that's it. It doesn't reach further to the Client. How can I fix this ?
I think I need to somehow connect the serverIncoming
sink to the source
in the finalFlow
, but I can't figure out how to do it...
Or am I totally wrong with this approach ? Is it better to use a Bidiflow
or a Graph
? I'm new to akka streams and still trying to figure things out.