1
votes

I'm currently using a combination of Akka actors, streaming and http to handle a scenario where a client tells the server how quickly it would like to receive events, and the server sends the client events at that rate.

Here's the client message to the server:

{
"uuid": "d30711c6-6bbf-4f11-9471-638ef7e19dfd",
"startTime": "1501254050",
"rate": "1.5"
}

This is received by a route that looks like this:

path("ws") { // /ws path allows websocket connections
    get {
      extractUpgradeToWebSocket { upgrade =>
          complete(upgrade.handleMessagesWithSinkSource(replaySink, Source.maybe))
      }
    }
  }

The replaySink just parses the JSON into a case class, which it sends along to a supervisor actor using:

val replaySupervisor = system.actorOf(ReplaySupervisor.props(), "replay-supervisor")
val replaySink: Sink[Message, NotUsed] = Flow[Message]
    .map{
      case tm: TextMessage.Strict =>
        val msgJson = parse(tm.getStrictText).getOrElse(Json.Null)
        msgJson.as[ReplayRequest].getOrElse("JSONParsingError")
      case _ => log.error("Incoming message not in proper format")
    }
    .to(Sink.actorRefWithAck(
         ref = replaySupervisor,
         onInitMessage = InitMessage,
         ackMessage = AckMessage,
         onCompleteMessage = ConnectionClosed
       )
    )

The ConnectionClosed message only gets sent after the WebSocket connection has timed out, since it's effectively an unbounded source otherwise.

When the supervisor receives the message, it creates new child actor, giving it the UUID provided by the client and then it passes along the message. Or if the child already exists (since the client has to keep the connection open by regularly sending the same event, or because it changed the start-time/rate and sends a new message), it just passes it along.

My question is, once the connection closes, how do I tell the child actor to stop? The sink above has no knowledge of what child actor eventually receives its messages, so I cannot send the ConnectionClosed message with the UUID for the supervisor to forward to tell the child actor to stop itself. Sending a PoisonPill would just kill the supervisor. The best way I can think of is to use the set the ReceiveTimeout property of the child to stop itself after it hasn't received an event in a while, and have the client send regular messages.

Is there a more elegant way to do this that'll stop the actor the moment a connection closes, however?

1
What do you mean, "if the child already exists"? Can a UUID appear in more than one message during a connection?Jeffrey Chung
Yes, while the connection is open the client has to regularly send the message with its UUID to keep the connection alive. A client can also change the start-time/rate while the connection is open, and so it would be passed along in that case as well.Brideau
So while a particular connection is open, the supervisor will see uuid1 in multiple messages and uuid2 in multiple messages. One child handles all uuid1 messages, and another child handles all uuid2 messages. Is my understanding correct?Jeffrey Chung
Yes, that's correct.Brideau
Okay. When a connection times out, why wouldn't you stop all the children? It sounds as if you want to stop only one child, but which one (i.e., which UUID)?Jeffrey Chung

1 Answers

0
votes

When the Sink.actorRef is materialized it creates an actor internally, this actor will be the sender of the messages to your actor.

You could keep a map in the supervisor with sender ActorRef from the InitMessage and the child created for that sender. When you get ConnectionClosed (or the Failed on stream failure) the sender will be the same and you can look up the right child and shut it down.

Another option could be to generate a unique id (an UUID for example) in the route and provide with the InitMessage(id) and the ConnectionClosed(id).