1
votes

I'm trying to subscribe to the log stream of a remote akka ActorSystem, basically to write a console that shows the running logs of the remote Actors.

The only way I can think to do this is to: create an Actor within the logging ActorSystem, have that Actor subscribe to the ActorSystem.eventStream and then subscribe to that Actor using actorSelection from within my console's ActorSystem.

But this seems very "indirect" since the log pipeline would look like:

logging Actor --> eventStream --> Actor subscribed to eventStream --> local Actor

Is there an easier way to subscribe to the event stream?

1

1 Answers

2
votes

From a simplicity viewpoint, nothing forbids you to subscribe a remote actor to your event stream without an additional actor. The Akka documentation mentions:

The event stream is a local facility, meaning that it will not distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly). If you need to broadcast events in an Akka cluster, without knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: Distributed Publish Subscribe in Cluster.

For illustration purposes, consider the following code fragment which corresponds to the remote system, the one you want to subscribe to:

  class PublisherActor extends Actor with ActorLogging { // example publisher actor just to generate some logs
    context.system.scheduler.schedule(1.second, 1.second, self, "echo")
    def receive = {
      case "echo" ⇒
        val x = Random.nextInt(100)
        log.info(s"I got a random number: $x")
    }
  }

  def runPublisher() = {
    println("=== running publisher node ===")
    val system = ActorSystem("PublisherSystem")
    val selection = system.actorSelection("akka.tcp://[email protected]:2553/user/subscriber")
    selection.resolveOne(10.seconds) onSuccess { // when the listener actor is available,
      case listener ⇒ system.eventStream.subscribe(listener, classOf[LogEvent]) // subscribe it to the event stream
    }
    val publisher = system.actorOf(Props[PublisherActor], "publisher") // some example publisher
  }

And then the corresponding subscriber in the "local" node, from where you want to show the logs:

  class SubscriberActor extends Actor with ActorLogging {
    log.info("subscriber listening...")
    def receive = {
      case msg ⇒ log.info(s"Got: $msg")
    }
  }

  def runSubscriber() = {
    println("=== running subscriber node ===")
    val system = ActorSystem("SubscriberSystem")
    val listener = system.actorOf(Props[SubscriberActor], "subscriber")
  }

However, there are several caveats to this solution, as the fact that the publisher must be running before the subscriber (or the subscriber implement some retry policy until the publisher is up), the location is hardcoded and so on. If you want to have a more robust and resilient system and it's permissible, follow the advice in the documentation and use a distributed publisher-subscriber in a clustered environment which poses several advantages with a similar amount of boilerplate.

Hope it helped!