The WebSocket does consist of two separate streams, it's just that those streams are (likely) not on the same JVM.
You have two peers communicating, one being the server the other one the client, but from the point of an established WebSocket connection the difference doesn't matter anymore. One stream of data is peer 1 sending messages to peer 2, the other stream is peer 2 sending messages to peer 1 and then there is a network boundary between those two peers. If you look at it one peer at a time, you have peer 1 receiving messages from peer 2 and in the second stream peer 1 is sending messages to peer 2.
Each peer has a Sink for the receiving part and a Source for the sending part. You do actually have two Sources and two Sinks in total, just not both on the same ActorSystem (assuming for the sake of explanation that both peers are implemented in Akka HTTP). The Source from peer 1 is connected to the Sink of peer 2 and the Source of peer 2 is connected to the Sink of peer 1.
So, you write a Sink that describes how to deal with incoming messages over the first stream and a Source that describes how to send messages over the second stream. Often you want to produces messages based on the ones that you are receiving, so you can connect those two together and route the messages through different flows that describe how to react and incoming messages and produce any number of outgoing messages. The Flow[Message, Message, _]
does not mean that you're transforming outgoing messages to incoming messages but rather incoming messages to outgoing messages.
The webSocketFlow
is a typical asynchronous boundary, a flow that represent the other peer. It's "transforming" outgoing messages to incoming messages by sending them to the other peer and emitting whatever the other peer sends.
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
This flow is your peer's half of the two streams:
[message from other peer]
connected to printSink
helloSource
connected to [message to the other peer]
There is no relation between incoming messages and outgoing messages, you just print everything you receive and send a single "hello world!". Actually, since the source completes after one message, the WebSocket connection closes as well, but if you replace the Source with for example Source.repeat
, you'd be constantly sending (flooding, really) "hello, world!" over the wire, regardless of the rate of incoming messages.
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
Here you take everything coming from outgoing
, which is the messages you want to send, route it through the webSocketFlow
, which "transforms" the message by communicating with the other peer and produces every received message into incoming
. Often you have a wire protocol where you encode and decode your case class/pojo/dto messages into and from the wire format.
val encode: Flow[T, Message, _] = ???
val decode: Flow[Message, T, _] = ???
val upgradeResponse = outgoing
.via(encode)
.viaMat(webSocketFlow)(Keep.right)
.via(decode)
.to(incoming)
.run()
Or you can imagine some kind chat server (ah, websockets and chats), which broadcasts and merges messages from and to a number of clients. This should take any message from any client and send it to every client (for demonstration only, untested and probably not what you would want for an actual chat server):
val chatClientReceivers: Seq[Sink[Message, NotUsed]] = ???
val chatClientSenders: Seq[Source[Message, NotUsed]] = ???
// each of those receivers/senders could be paired in their own websocket compatible flow
val chatSockets: Seq[Flow[Message, Message, NotUsed]] =
(chatClientReceivers, chatClientSenders).zipped.map(
(outgoingSendToClient, incomingFromClient) =>
Flow.fromSinkAndSource(outgoingSendToClient, incomingFromClient))
val toClients: Graph[SinkShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Message](chatClientReceivers.size))
(broadcast.outArray, chatClientReceivers).zipped
.foreach((bcOut, client) => bcOut ~> b.add(client).in)
SinkShape(broadcast.in)
}
val fromClients: Graph[SourceShape[Message], NotUsed] =
GraphDSL.create() {implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Message](chatClientSenders.size))
(merge.inSeq, chatClientSenders).zipped
.foreach((mIn, client) => b.add(client).out ~> mIn)
SourceShape(merge.out)
}
val upgradeResponse: Future[WebSocketUpgradeResponse] =
Source.fromGraph(fromClients)
.viaMat(webSocketFlow)(Keep.right)
.to(toClients)
.run()
Hope this helps a bit.