0
votes

In a context of Akka cluster application, I met an issue about one property expected by Akka : every (cas) class and every message used must be serializable. I have the following context : I want to consume data from a redis cluster and for that, I decided to adopt the cluster aware router pool to add nodes to have more workers. Workers read data from redis and store some metadata in mongodb. In a first version, I did this :

object MasterWorkers {

  def props
  (  awsBucket : String,
     gapMinValueMicroSec : Long,
     persistentCache: RedisCache,
     mongoURI : String,
     mongoDBName : String,
     mongoCollectioName : String
  ) : Props =
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp)
}

case class MasterWorkers
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

  val workerRouter =
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)),
    name = "workerRouter")

Worker class :

object Worker {

  def props
  (
    awsBucket : String,
    gapMinValueMicroSec : Long,
    replyTo : ActorRef,
    persistentCache: RedisCache,
    mongoURI : String,
    mongoDBName : String,
    mongoCollectioName : String
  ) : Props =
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
  case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp)

  case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
}

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

But this raises the below exception when I started two nodes :

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894)
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786)
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl

The redis cache is simple case class with a companion object implementing a interface like this :

object RedisCache { // some static functions }

case class RedisCache
(
  master : RedisServer,
  slaves : Seq[RedisServer]
)(implicit actorSystem : ActorSystem)
  extends PersistentCache[DeviceKey, BCPPackets] with LazyLogging {
// some code here
}

Then to solve the issue, I moved the redisCache in the worker and I'm not giving it to the master node :

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

// redis cache here now 
val redisCache = ...

But with such design, every routee will create a new instance of the redis cache and it's not the expected behaviour. What I want is to have one instance of my redis cache and then share it with all my routees but in a context of a cluster application, it seems to not be possible so I don't know if it's a design failure or some missing experience with Akka. If anyone met similar issues, I take advices with pleasure !

1

1 Answers

0
votes

The problem is that your RedisCache is not that simple. It carries around an ActorSystem, which cannot be serialized.

I guess this is because it's containing RedisClient instances from - e.g. - rediscala library, and these require an ActorSystem.

You will need to abstract from the actor system, and only pass to your workers the bare details of the Redis cluster (i.e. the RedisServer objects).

The workers will then instantiate the RedisClient themselves - using their context.system.

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  redisMaster: RedisServer,
  redisSlaves: Seq[RedisServer],
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

  val masterSlaveClient = ??? //create from the RedisServer details

}

This will allow each worker to put up their own connection with the redis cluster.

Alternatively, if you want to connect only once in your master and share the connection to your workers, you need to pass around the RedisClientActor (source here) that embeds your connection. This is an ActorRef and can be shared remotely.

This can be obtained by calling client.redisConnection.

The workers can then build an ActorRequest around it, for example

case class Worker
    (
      awsBucket : String,
      gapMinValueMicroSec : Long,
      replyTo : ActorRef,
      redisConnection: ActorRef,
      mongoURI : String,
      mongoDBName : String,
      mongoCollectioName : String
    ) extends Actor with ActorLogging with ActorRequest {

      // you will need to implement the execution context that ActorRequest needs as well..

      send(redisCommand)

    }