3
votes

What is the correct way to have a Broadcast actor dynamically add/remove routees over time?

Context of the problem: An actor listens to price changes of a particular item, then it broadcasts to all other actors (routees) the price change and they act according to internal rules (for example if the price is X buy or sell).

I am new to Akka, but after reading through the documentation I believe I have figured out the components needed but if you feel my design or components used are incorrect please comment/answer.

I want to change from a fixed list of routees

        ActorRef actor1 = system.actorOf(new Props(LimitOrderActor.class));
        ActorRef actor2 = system.actorOf(new Props(LimitOrderActor.class));
        ActorRef actor3 = system.actorOf(new Props(LimitOrderActor.class));
        Iterable<ActorRef> routees = Arrays.asList(new ActorRef[] { actor1, actor2, actor3 });
        ActorRef actorBroadcastRouter1 = system.actorOf(new Props(TickerWatcherActor.class).withRouter(BroadcastRouter.create(routees)), "router1");

To something like a dynamically sized BroadcastRouter where actors are created after the BroadcastRouter is up and running

        int lowerBound = 1;
        int upperBound = 10000;
        DefaultResizer resizer = new DefaultResizer(lowerBound, upperBound);
        BroadcastRouter broadcastRouter2 = new BroadcastRouter(resizer);
        ActorRef actorBroadcastRouter2 = system.actorOf(new Props(TickerWatcherActor.class).withRouter(broadcastRouter2), "router2");

        ActorRef actor4 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));
        ActorRef actor5 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));
        ActorRef actor6 = system.actorOf(new Props(LimitOrderActor.class).withRouter((RouterConfig) broadcastRouter2));

Right now the Actor "actorBroadcastRouter2" is receiving the message not the intended LimitOrderActor actors 4, 5 and 6. What am I doing wrong?

Edit: I believe now what I am looking for is the Event Bus not the BroadcastRouter

   final ActorRef actor = system.actorOf(new Props(LimitOrderActor.class));
   system.eventStream().subscribe(actor, String.class);
2

2 Answers

0
votes

Let's close this question, I was stumbling around the same problem. This is not the way broadcast routers are working. You create new broadcast routers in actor4, actor5 and actor6 instead of adding to the existing broadcast router in actorBroadcastRouter2.

What you want is really simple in scala with the Listeners trait which is sadly not available when programming Akka with Java:

class MyActor extends Actor with Listeners {
  def receive = {
     case yourmessages => gossip("To All my listeners")
  } orElse listenerManagement
}

val myActor = context.actorOf(Props[MyActor])

myActor ! Listen(someActorRef)

myActor ! "pigdog"

Have to do your own Java version or use DistributedPubSubExtension for a more full-blown thing.

https://groups.google.com/forum/?fromgroups=#!topic/akka-user/NEOY9IxRW5I

https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/routing/Listeners.scala

http://doc.akka.io/docs/akka/snapshot/contrib/distributed-pub-sub.html#a-small-example-in-java

0
votes

Use the Event Bus not the BroadcastRouter:

final ActorRef actor = system.actorOf(new Props(LimitOrderActor.class));
system.eventStream().subscribe(actor, String.class);