43
votes

I’ve been trying to wrap my head around the concepts of Akka and actor-based systems recently. While I have a pretty good understanding of the Akka fundamentals by now I’m still struggling with a few things when it comes to clustering and remote actors.

I’ll try to illustrate the issue using the WebSocket chat example that comes with Play Framework 2.0: There's an actor that holds the WebSockets and that keeps a list of the currently connected users. The actors basically represents the chat room both technically and logically. This works perfectly fine as long as there’s a single chat room running on a single server.

Now I'm trying to understand how this example would have to be extended when we are talking about many dynamic chat rooms (new rooms can be opened/closed at any time) running on a cluster of servers (with single nodes being added or removed according to current demand). In such a case user A could connect to server 1 while user B connects to server 2. Both might be talking on the same chat room. On each server there would still be an actor (for each chat room?) that holds the WebSocket instances to receive and publish events (messages) to the right users. But logically there should only be one chat room actor on either server 1 or server 2 that holds the list of currently connected users (or similar tasks).

How would you go about to achieve this, preferably in ”pure akka” and without adding an additional messaging system like ZeroMQ or RabbitMQ?

This is what I’ve come up with so far, please let me know whether this makes any sense:

  1. User A connects to server 1 and an actor is allocated that holds his WebSocket.
  2. The actor checks (using Router? EventBus? Something else?) whether a ”chat room actor” for the active chat room exists on any of the connected cluster nodes. Since it doesn't it will request the creation of a new chat room actor somehow and will send and receive future chat messages to/from this actor.
  3. User B connects on server 2 and an actor is allocated for his WebSocket as well.
  4. It also checks whether an actor for the requested chat room exists somewhere and finds it on server 1.
  5. The chat room actor on server 1 now acts as the hub for the given chat room, sending messages to all ”connected” chat member actors and distributing incoming ones.

If server 2 goes down, the chat room actor would have to be re-created on/moved to server 2 somehow, although this is not my primary concern right now. I'm wondering most about how this dynamic discovery of actors spread about various, basically independent machines could be done using Akka’s toolset.

I’ve been looking at Akka’s documentation for quite some time now, so maybe I’m missing the obvious here. If so, please enlighten me :-)

3

3 Answers

13
votes

I'm working on a private project which is basically a very extended version of the chatroom example and I also had startup problems with akka and the whole "decentralized" thinking. So I can tell you how I "solved" my extended chatroom:

I wanted a server which could easily be deployed multiple times without much additional configuration. I am using redis as the storage for all open user sessions (simple serialization of their ActorRefs) and for all chatrooms.

The server has the following actors:

  • WebsocketSession: which holds the connection to one user and handles requests from the user and forwards messages from the system.
  • ChatroomManager: this is the central broadcaster, which is deployed on every instance of the server. If a user wants to send a message to a chatroom, the WebSocketSession-Actor sends all the information to the ChatroomManager-Actor which then broadcasts the message to all members of the chatroom.

So here is my procedure:

  1. User A connects to server 1 which allocates a new WebsocketSession. This actor inserts the absolute path to this actor into redis.
  2. User A joins a chatroom X which also inserts his absolute path (I use this as the unique ID of a user session) into redis (each chatroom has a "connections" set)
  3. User B connects to server 2 -> redis
  4. User B joins chatroom X -> redis
  5. User B sends a message to chatroom X as follows: user B sends his message over the Websocket to his session-actor, which (after some checks) sends a actor-message to the ChatroomManager. This actor actually retrieves the user-list of the chatroom from redis (absolute paths used with akka's actorFor-method) and then sends the message to each session-actor. These session-actors then write to their websockets.

In each ChatroomManager-actor I do some ActorRef caching which gave additional speed. I think this differs from your approach, especially that these ChatroomManagers handle requests for all chatrooms. But having one actor for one chatroom is a single point of failure which I wanted to avoid. Further would this cause a lot more messages, eg:

  • User A and user B are on server 1.
  • Chatroom X is on server 2.

If user A wants to talk to user B, they both would have to communicate over the chatroom-actor on server 1.

In addition I used akka's functionalities like (round-robin)-routers to create multiple instances of a the ChatroomManager-actor on each system to handle many requests.

I spend some days on setting up the whole akka remote infrastructure in combination with serialization and redis. But now I am able to create any number of instances of the server application which use redis to share there ActorRefs (serialized as absolute paths with ip+port).

This may helps you a little bit further and I'm open for new questions (please not about my english ;).

10
votes

The key to scaling out across multiple machines is to keep mutable state as isolated as possible. Although you can use a distributed cache to coordinate state across all nodes, this would give you synchronization as well as bottleneck issues when scaling out to a large number of nodes. Ideally then, there should be a single actor knowing about the messages and participants in a chat room.

The core of your problem is, if a chat room is represented by a single actor running on a single machine - or indeed if such a room exists at all. The trick is to route requests related to a given chat room using an identifier, such as the name of the chat room. Compute the hash of the name, and depending on the number, pick one out of your n boxes. The node will know about its current chat rooms and can safely find or create the correct chat room actor for you.

You may take a look at the following blog posts discussing clustering and scaling out in Akka:

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-1/

http://blog.softmemes.com/2012/06/16/clustered-akka-building-akka-2-2-today-part-2/

7
votes

I would use Zookeeper+Norbert to know about which hosts are going up and down:

http://www.ibm.com/developerworks/library/j-zookeeper/

Now every node in my chatroom server farm can know all the hosts in the logical cluster. They will get a callback when a node goes offline (or comes online). Any node can now keep a sorted list of current cluster members, hash the chatroom ID, and mod by the list size to get the index within the list which is the node which should host any given chatroom. We can add 1 and rehash to pick a second index (requires a loop until you get a fresh index) to compute the second host to hold a second copy of the chatroom for redundancy. On each of the two chatroom hosts is a chatroom actor which just forwards all chat messages to each Websocket actor which is a chatroom member.

Now we can send chat messages via both active chatroom actors with a custom Akka router. A client just sends out the message once and the router will do the hash mods and send to the two remote chatroom actors. I would use the twitter snowflake algorithm to generate unique 64bit ids for the messages being sent. See the algorithm in the nextId() method of the code at the following link. The datacenterId and workerId can be set using the norbert properties to ensure none colliding IDs being generated on the different servers:

https://github.com/twitter/snowflake/blob/master/src/main/scala/com/twitter/service/snowflake/IdWorker.scala

Now two copies of every messages will go to each client endpoint via each of the two active chatroom actors. At each Websocket client actor I would un-bitmask the snowflake IDs to learn the datacenterId+workerId number sending the message and keep track of the highest chat message number seen from each host in the cluster. Then I would ignore any messages which are not higher than what has already been seen at the given client for a given sender host. This would deduplicate the pair of messages coming in via the two active chatroom actors.

So far so good; we would have resilient messaging in that if any node dies we wont loose the one surviving copy of the chatrooms. Messages will flow uninterrupted via the second chatroom automatically.

Next up we need to deal with nodes dropping out of the cluster or being added back into the cluster. We will get a norbert callback within each node to notify us about cluster membership changes. On this callback we can send out an akka message via the custom router stating the new membership list and the current hostname. The custom router on the current host will see that message and update its state to know about the new cluster membership to compute the new pair of nodes to send any given chatroom traffic via. This acknowledgement of the new cluster membership will be sent by the router to all nodes so that every server can track when all servers have caught up with the membership change and are now sending messages correctly.

The surviving chatroom may still be active after the membership change. In which case all routers on all nodes will continue sending to it as normal but will also send a message speculatively to the new second chatroom host. That second chatroom may not yet be up but thats not a problem as messages will flow via the survivor. If the surviving chatroom is no longer to be active after the membership change all routers on all hosts will at first send to three hosts; the surviver and the two new nodes. The akka death watch mechanism can be used so that all nodes can eventually see the surviving chatroom shutdown to get back to routing chat traffic via two hosts.

Next up we need to migrate the chatroom from the surviving server onto the one or two new hosts depending on the circumstances. The surving chatroom actor will at some point get a message telling it about the new cluster membership. It will start by sending a copy of the chatroom membership to the new nodes. This message will create the new copy of the chatroom actor with the correct membership on the new nodes. If the surviver is no longer one of the two nodes which should hold the chatroom it will go into decommissioning mode. In decommissioning mode it will only forward on any messages to the new primary and secondary nodes not to any chatroom members. Akka message forwarding is perfect for this.

A decommissioning chatroom will listen out for the norbert cluster membership acknowledgement messages from each node. Eventually it will see that all nodes within the cluster have acknowledged the new cluster membership. It then knows it will no longer receive any further messages to forward. It can then kill itself. Akka hotswapping is perfect to implement the decommissioning behaviour.

So far so good; we have a resilient messaging setup which wont loose messages for a node crash. At the point where the cluster membership changes we will get a spike of intranode traffic to copy chatrooms to new nodes. We also have a residual flurry of intranode forwarding of message to nodes until all servers have caught up with which chatrooms have moved two which servers. If we want to scale up the system we can wait until a low point in user traffic and just turn on a new node. The chatrooms would get redistributed across the new nodes automatically.

The above description is based on reading the following paper and translating it into akka concepts:

https://www.dropbox.com/s/iihpq9bjcfver07/VLDB-Paper.pdf