I'm new to Akka and Scala and self learning this to do a small project with websockets. End goal is simple, make a basic chat server that publishes + subscribes messages on some webpage.
In fact, after perusing their docs, I already found the pages that are relevant to my goal, namely this and this.
Using dynamic junctions (aka MergeHub & BroadcastHub), and the Flow.fromSinkAndSource() method, I was able to acheive a very basic example of what I wanted. We can even get a kill switch using the example from the akka docs which I have shown below. Code is like:
private lazy val connHub: Flow[Message, Message, UniqueKillSwitch] = {
val (sink, source) = MergeHub.source[Message].toMat(BroadcastHub.sink[Message])(Keep.both).run()
Flow.fromSinkAndSourceCoupled(sink, source).joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)
}
However, I now see one issue. The above will return a Flow that will be used by Akka's websocket directive: akka.http.scaladsl.server.Directives.handleWebSocketMessages(/* FLOW GOES HERE */)
That means the akka code itself will materialize this flow for me so long as I provide it the handler.
But let's say I wanted to arbitrarily kill one user's connection through a KillSwitch (maybe because their session has expired on my application). While a user's websocket would be added through the above handler, since my code would not be explicitly materializing that flow, I won't get access to a KillSwitch. Therefore, I can't kill the connection, only the user can when they leave the webpage.
It's strange to me that the docs would mention the kill switch method without showing how I would get one using the websocket api.
Can anyone suggest a solution as to how I could obtain the kill switch per connection? Do I have a fundamental misunderstanding of how this should work?
Thanks in advance.