1
votes

I have the following flow:

val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)

val targetSink = Flow[ByteString]
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, "Done"))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))

within an Actor (which is the Sink.actorRef one). The conn.flow is an incoming TCP Connection using Tcp().bind(address, port).

Currently the Sink.actorRef Actor keeps running when the tcp connection is closed from the client side. Is there a way to register the client side termination of the tcp connection to shutdown the Actor?

Edit: I tried handling both cases as suggested:

case "Done" =>
  context.stop(self)

case akka.actor.Status.Failure =>
  context.stop(self)

But when I test with a socket client and cancel it, the actor is not being shutdown. So neither the "Done" message nor the Failure seem to be registered if the TCP connection is terminated.

Here is the whole code:

private var connection: Option[Tcp.IncomingConnection] = None
private var mqttpubsub: Option[ActorRef] = None
private var sourceRef: Option[ActorRef] = None

private val sdcTopic = "out"
private val actorSource = Source.actorRef(10000, OverflowStrategy.dropHead)

implicit private val system = context.system
implicit private val mat = ActorMaterializer.create(context.system)

override def receive: Receive = {

case conn: Tcp.IncomingConnection =>
  connection = Some(conn)

  mqttpubsub = Some(context.actorOf(Props(classOf[MqttPubSub], PSConfig(
    brokerUrl = "tcp://127.0.0.1:1883", //all params is optional except brokerUrl
    userName = null,
    password = null,
    //messages received when disconnected will be stash. Messages isOverdue after stashTimeToLive will be discard
    stashTimeToLive = 1.minute,
    stashCapacity = 100000, //stash messages will be drop first haft elems when reach this size
    reconnectDelayMin = 10.millis, //for fine tuning re-connection logic
    reconnectDelayMax = 30.seconds
  ))))

  val targetSink = Flow[ByteString]
    .alsoTo(Sink.foreach(println))
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, "Done"))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))

case msg: MqttMessages.MqttMessage =>
  processMessage(msg)

case msg: Message =>
  val jsonMsg = JsonParser(msg.payload).asJsObject
  val mqttMsg = MqttMessages.jsonToObject(jsonMsg)

  try {
    sourceRef.foreach(_ ! ByteString(msg.payload))
  } catch {
    case e: Throwable => e.printStackTrace()
  }


case SubscribeAck(Subscribe(topic, self, qos), fail) =>

case "Done" =>
  context.stop(self)

case akka.actor.Status.Failure =>
  context.stop(self)
}
2

2 Answers

0
votes

the Actor keeps running

Which actor do you mean, the one you've registered with Sink.actorRef? If yes, then to shut it down when the stream shuts down, you need to handle "Done" and akka.actor.Status.Failure messages in it and invoke context.stop(self) explicitly. "Done" message will be sent when the stream closes successfully, while Status.Failure will be sent if there is an error.

For more information see Sink.actorRef API docs, they explain the termination semantics.

0
votes

I ended up creating another Stage, which only passes elements through but and emits an additional message to the next flow if the upstream closes:

class TcpStage extends GraphStage[FlowShape[ByteString, ByteString]] {

   val in = Inlet[ByteString]("TCPStage.in")
   val out = Outlet[ByteString]("TCPStage.out")
   override val shape = FlowShape.of(in, out)

   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

     setHandler(out, new OutHandler {
       override def onPull(): Unit = {
         if (isClosed(in)) emitDone()
         else pull(in)
       }
     })
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        push(out, grab(in))
      }

      override def onUpstreamFinish(): Unit = {
        emitDone()
        completeStage()
      }
    })

    private def emitDone(): Unit = {
      push(out, ByteString("{ }".getBytes("utf-8")))
    }
  }
}

Which I then use in my flow:

  val targetSink = Flow[ByteString]
    .via(new TcpStage())
    .map(_.utf8String)
    .via(new JsonStage())
    .map { json =>
      MqttMessages.jsonToObject(json)
    }
    .to(Sink.actorRef(self, MqttDone))

  sourceRef = Some(Flow[ByteString]
    .via(conn.flow)
    .to(targetSink)
    .runWith(actorSource))