0
votes

I am using Akka WebSockets for client-server communication.

On the client:

source
    .viaMat(Http().webSocketClientFlow(httpReq, settings))(Keep.both)
    .toMat(sink)(Keep.both)
    .run()

where source is

val source = Source.actorRef[Object](Int.MaxValue,OverflowStrategy.fail)

And the sink forwards all the messages to ClientActor.

On the server:

val requestHandler: HttpRequest => HttpResponse = {
      case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) =>
        req.header[UpgradeToWebSocket] match {
          case Some(upgrade: UpgradeToWebSocket) =>

            /**
              * Create an ActorRef for the client. Sending messages to this actor replies to the client.
              * fanout = false, because there is only one output (no broadcasting).
              */
            val source: Source[Object, ActorRef] = Source.actorRef[Object](Int.MaxValue, OverflowStrategy.fail)

            val (clientActor: ActorRef, publisher) = source
              .via(flow)
              .toMat(Sink.asPublisher(fanout = false))(Keep.both).run()

            // receives messages from the client

            val sink: Sink[Message, NotUsed] = Common.getSink(messageHandler, clientActor)

            upgrade.handleMessagesWithSinkSource(sink, Source.fromPublisher(publisher))

          case None =>
            HttpResponse(400, entity = "Not a valid websocket request!")
        }
      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

And the sink forwards all the messages to the ServerActor.

ServerActor

def receive: PartialFunction[Any, Unit] = {
    case Terminated(sender) =>
      logger.info(s"Detected that client disconnected: $sender")

But when I stop the clientActor with clientActor ! PoisonPill the server takes exactly a minutes to detect that the clientActor has been disconnected. What changes do I need to make in the clientActor or the serverActor to detect the disconnect immideately? Any pointers will be appriciated.

Debugging I've done:

I have tried using system.context.terminate() and with this the ServerActor immidiately detects the disconnect, but then it does not allow me to create a new ClientActor. I run into "Why does Akka fail with "IllegalStateException: cannot create children while terminating or terminated" when testing with ScalaTest?" error.

I did a little digging around and found that the Akka WebSocket uses streamSupervisor actors for communication and both the clientActor and serverActor watches these streamSupervisor actors.

Expectation here is to immideately detect the client disconnection on the serverActor actor.

1

1 Answers

0
votes

You could use the Sink.actorRef for the sink of the flow. After the connection is terminated by the client, the actor will receive this message and you can poison the child actor which corresponding to the WebSocket.