I have the following flow:
val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)
val targetSink = Flow[ByteString]
.map(_.utf8String)
.via(new JsonStage())
.map { json =>
MqttMessages.jsonToObject(json)
}
.to(Sink.actorRef(self, "Done"))
sourceRef = Some(Flow[ByteString]
.via(conn.flow)
.to(targetSink)
.runWith(actorSource))
within an Actor
(which is the Sink.actorRef one). The conn.flow
is an incoming TCP Connection using Tcp().bind(address, port)
.
Currently the Sink.actorRef Actor
keeps running when the tcp connection is closed from the client side. Is there a way to register the client side termination of the tcp connection to shutdown the Actor
?
Edit: I tried handling both cases as suggested:
case "Done" =>
context.stop(self)
case akka.actor.Status.Failure =>
context.stop(self)
But when I test with a socket client and cancel it, the actor is not being shutdown. So neither the "Done" message nor the Failure seem to be registered if the TCP connection is terminated.
Here is the whole code:
private var connection: Option[Tcp.IncomingConnection] = None
private var mqttpubsub: Option[ActorRef] = None
private var sourceRef: Option[ActorRef] = None
private val sdcTopic = "out"
private val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)
implicit private val system = context.system
implicit private val mat = ActorMaterializer.create(context.system)
override def receive: Receive = {
case conn: Tcp.IncomingConnection =>
connection = Some(conn)
mqttpubsub = Some(context.actorOf(Props(classOf[MqttPubSub], PSConfig(
brokerUrl = "tcp://127.0.0.1:1883", //all params is optional except brokerUrl
userName = null,
password = null,
//messages received when disconnected will be stash. Messages isOverdue after stashTimeToLive will be discard
stashTimeToLive = 1.minute,
stashCapacity = 100000, //stash messages will be drop first haft elems when reach this size
reconnectDelayMin = 10.millis, //for fine tuning re-connection logic
reconnectDelayMax = 30.seconds
))))
val targetSink = Flow[ByteString]
.alsoTo(Sink.foreach(println))
.map(_.utf8String)
.via(new JsonStage())
.map { json =>
MqttMessages.jsonToObject(json)
}
.to(Sink.actorRef(self, "Done"))
sourceRef = Some(Flow[ByteString]
.via(conn.flow)
.to(targetSink)
.runWith(actorSource))
case msg: MqttMessages.MqttMessage =>
processMessage(msg)
case msg: Message =>
val jsonMsg = JsonParser(msg.payload).asJsObject
val mqttMsg = MqttMessages.jsonToObject(jsonMsg)
try {
sourceRef.foreach(_ ! ByteString(msg.payload))
} catch {
case e: Throwable => e.printStackTrace()
}
case SubscribeAck(Subscribe(topic, self, qos), fail) =>
case "Done" =>
context.stop(self)
case akka.actor.Status.Failure =>
context.stop(self)
}