I am using Akka WebSockets for client-server communication.
On the client:
source
.viaMat(Http().webSocketClientFlow(httpReq, settings))(Keep.both)
.toMat(sink)(Keep.both)
.run()
where source is
val source = Source.actorRef[Object](Int.MaxValue,OverflowStrategy.fail)
And the sink forwards all the messages to ClientActor
.
On the server:
val requestHandler: HttpRequest => HttpResponse = {
case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) =>
req.header[UpgradeToWebSocket] match {
case Some(upgrade: UpgradeToWebSocket) =>
/**
* Create an ActorRef for the client. Sending messages to this actor replies to the client.
* fanout = false, because there is only one output (no broadcasting).
*/
val source: Source[Object, ActorRef] = Source.actorRef[Object](Int.MaxValue, OverflowStrategy.fail)
val (clientActor: ActorRef, publisher) = source
.via(flow)
.toMat(Sink.asPublisher(fanout = false))(Keep.both).run()
// receives messages from the client
val sink: Sink[Message, NotUsed] = Common.getSink(messageHandler, clientActor)
upgrade.handleMessagesWithSinkSource(sink, Source.fromPublisher(publisher))
case None =>
HttpResponse(400, entity = "Not a valid websocket request!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
And the sink
forwards all the messages to the ServerActor
.
ServerActor
def receive: PartialFunction[Any, Unit] = {
case Terminated(sender) =>
logger.info(s"Detected that client disconnected: $sender")
But when I stop the clientActor with clientActor ! PoisonPill
the server takes exactly a minutes to detect that the clientActor has been disconnected. What changes do I need to make in the clientActor or the serverActor to detect the disconnect immideately? Any pointers will be appriciated.
Debugging I've done:
I have tried using system.context.terminate()
and with this the ServerActor immidiately detects the disconnect, but then it does not allow me to create a new ClientActor. I run into "Why does Akka fail with "IllegalStateException: cannot create children while terminating or terminated" when testing with ScalaTest?" error.
I did a little digging around and found that the Akka WebSocket uses streamSupervisor
actors for communication and both the clientActor
and serverActor
watches these streamSupervisor
actors.
Expectation here is to immideately detect the client disconnection on the serverActor
actor.