I'm currently using a combination of Akka actors, streaming and http to handle a scenario where a client tells the server how quickly it would like to receive events, and the server sends the client events at that rate.
Here's the client message to the server:
{
"uuid": "d30711c6-6bbf-4f11-9471-638ef7e19dfd",
"startTime": "1501254050",
"rate": "1.5"
}
This is received by a route that looks like this:
path("ws") { // /ws path allows websocket connections
get {
extractUpgradeToWebSocket { upgrade =>
complete(upgrade.handleMessagesWithSinkSource(replaySink, Source.maybe))
}
}
}
The replaySink
just parses the JSON into a case class, which it sends along to a supervisor actor using:
val replaySupervisor = system.actorOf(ReplaySupervisor.props(), "replay-supervisor")
val replaySink: Sink[Message, NotUsed] = Flow[Message]
.map{
case tm: TextMessage.Strict =>
val msgJson = parse(tm.getStrictText).getOrElse(Json.Null)
msgJson.as[ReplayRequest].getOrElse("JSONParsingError")
case _ => log.error("Incoming message not in proper format")
}
.to(Sink.actorRefWithAck(
ref = replaySupervisor,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = ConnectionClosed
)
)
The ConnectionClosed
message only gets sent after the WebSocket connection has timed out, since it's effectively an unbounded source otherwise.
When the supervisor receives the message, it creates new child actor, giving it the UUID provided by the client and then it passes along the message. Or if the child already exists (since the client has to keep the connection open by regularly sending the same event, or because it changed the start-time/rate and sends a new message), it just passes it along.
My question is, once the connection closes, how do I tell the child actor to stop? The sink above has no knowledge of what child actor eventually receives its messages, so I cannot send the ConnectionClosed message with the UUID for the supervisor to forward to tell the child actor to stop itself. Sending a PoisonPill would just kill the supervisor. The best way I can think of is to use the set the ReceiveTimeout property of the child to stop itself after it hasn't received an event in a while, and have the client send regular messages.
Is there a more elegant way to do this that'll stop the actor the moment a connection closes, however?
uuid1
in multiple messages anduuid2
in multiple messages. One child handles alluuid1
messages, and another child handles alluuid2
messages. Is my understanding correct? – Jeffrey Chung