I'm trying to implement Event Bus (Pub-Sub) pattern on top of Akka's actors model.
"Native" EventBus implementation doesn't meet some of my requirements (e.g. possibility of retaining only last message in a topic, it's specific for MQTT protocol, I'm implementing message broker for it https://github.com/butaji/JetMQ).
Current interface of my EventBus is the following:
object Bus {
case class Subscribe(topic: String, actor: ActorRef)
case class Unsubscribe(topic: String, actor: ActorRef)
case class Publish(topic: String, payload: Any, retain: Boolean = false)
}
And usage looks like this:
val system = ActorSystem("System")
val bus = system.actorOf(Props[MqttEventBus], name = "bus")
val device1 = system.actorOf(Props(new DeviceActor(bus)))
val device2 = system.actorOf(Props(new DeviceActor(bus)))
All the devices have the reference to a single Bus actor. Bus actor is responsible for storing of all the state of subscriptions and topics (e.g. retain messages).
Device actors inside themselves can decide whatever they want to Publish, Subscribe or Unsubscribe to topics.
After some performance benchmarks, I realized that my current design affects processing time between Publishings and Subscriptions for the reasons that:
- My EventBus actually is a singleton
- It is caused a huge queue of processing load for it
How can I distribute (parallelize) workload for my event bus implementation? Is the current solution good to fit akka-cluster?
Currently, I'm thinking about routing through several instances of Bus as the following:
val paths = (1 to 5).map(x => {
system.actorOf(Props[EventBusActor], name = "event-bus-" + x).path.toString
})
val bus_publisher = system.actorOf(RoundRobinGroup(paths).props())
val bus_manager = system.actorOf(BroadcastGroup(paths).props())
Where:
- bus_publisher will be responsible for getting Publish,
- bus_manager will be responsible for getting Subscribe / Unsubscribe.
And as the following it will replicate state across all the buses and reduce queue per actor with the distribution of the load.