3
votes

I'm trying to implement Event Bus (Pub-Sub) pattern on top of Akka's actors model.

"Native" EventBus implementation doesn't meet some of my requirements (e.g. possibility of retaining only last message in a topic, it's specific for MQTT protocol, I'm implementing message broker for it https://github.com/butaji/JetMQ).

Current interface of my EventBus is the following:

object Bus {
  case class Subscribe(topic: String, actor: ActorRef)
  case class Unsubscribe(topic: String, actor: ActorRef)
  case class Publish(topic: String, payload: Any, retain: Boolean = false)
}

And usage looks like this:

val system = ActorSystem("System")
val bus = system.actorOf(Props[MqttEventBus], name = "bus")
val device1 = system.actorOf(Props(new DeviceActor(bus)))
val device2 = system.actorOf(Props(new DeviceActor(bus)))

All the devices have the reference to a single Bus actor. Bus actor is responsible for storing of all the state of subscriptions and topics (e.g. retain messages).

Device actors inside themselves can decide whatever they want to Publish, Subscribe or Unsubscribe to topics.

After some performance benchmarks, I realized that my current design affects processing time between Publishings and Subscriptions for the reasons that:

  1. My EventBus actually is a singleton
  2. It is caused a huge queue of processing load for it

How can I distribute (parallelize) workload for my event bus implementation? Is the current solution good to fit akka-cluster?

Currently, I'm thinking about routing through several instances of Bus as the following:

val paths = (1 to 5).map(x => {
  system.actorOf(Props[EventBusActor], name = "event-bus-" + x).path.toString
})

val bus_publisher = system.actorOf(RoundRobinGroup(paths).props())
val bus_manager = system.actorOf(BroadcastGroup(paths).props())

Where:

  • bus_publisher will be responsible for getting Publish,
  • bus_manager will be responsible for getting Subscribe / Unsubscribe.

And as the following it will replicate state across all the buses and reduce queue per actor with the distribution of the load.

1
How many subscribers and publishers do you have ?Romain Hippeau
@RomainHippeau up to several thousands per node, deviceActor is actually a shadow of TCP-connection. Each deviceActor can have typically a few subscriptions to different topicsVitaly Baum
maybe you are simply running against the limits for the pub/sub paradigm. It is not very scalable.Romain Hippeau

1 Answers

1
votes

You could route inside of your singleton bus instead of outside. Your bus could be responsible for routing messages and establishing topics, while sub-Actors could be responsible for distributing the messages. A basic example that demonstrates what I'm describing but without unsubscribe functionality, duplicate subscription checks, or supervision:

import scala.collection.mutable
import akka.actor.{Actor, ActorRef}

class HashBus() extends Actor {
  val topicActors = mutable.Map.empty[String, ActorRef]

  def createDistributionActor = {
    context.actorOf(Props[DistributionActor])
  }

  override def receive = {
    case subscribe : Subscribe =>
      topicActors.getOrElseUpdate(subscribe.topic, createDistributionActor) ! subscribe

    case publish : Publish =>
      topicActors.get(topic).foreach(_ ! publish)
  }
}

class DistributionActor extends Actor {

  val recipients = mutable.List.empty[ActorRef]

  override def receive = {
    case Subscribe(topic: String, actorRef: ActorRef) =>
      recipients +: actorRef

    case publish : Publish =>
      recipients.map(_ ! publish)
  }
}

This would ensure that your bus Actor's mailbox doesn't get saturated because the bus's job is simply to do hash lookups. The DistributionActors would be responsible for mapping over the recipients and distributing the payload. Similarly, the DistributionActor could retain any state for a topic.