In a context of Akka cluster application, I met an issue about one property expected by Akka : every (cas) class and every message used must be serializable. I have the following context : I want to consume data from a redis cluster and for that, I decided to adopt the cluster aware router pool to add nodes to have more workers. Workers read data from redis and store some metadata in mongodb. In a first version, I did this :
object MasterWorkers {
def props
( awsBucket : String,
gapMinValueMicroSec : Long,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) : Props =
Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName))
case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp)
}
case class MasterWorkers
(
awsBucket : String,
gapMinValueMicroSec : Long,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
val workerRouter =
context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)),
name = "workerRouter")
Worker class :
object Worker {
def props
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) : Props =
Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName))
case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp)
case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
}
case class Worker
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
persistentCache: RedisCache,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
But this raises the below exception when I started two nodes :
[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894)
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786)
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl
The redis cache is simple case class with a companion object implementing a interface like this :
object RedisCache { // some static functions }
case class RedisCache
(
master : RedisServer,
slaves : Seq[RedisServer]
)(implicit actorSystem : ActorSystem)
extends PersistentCache[DeviceKey, BCPPackets] with LazyLogging {
// some code here
}
Then to solve the issue, I moved the redisCache
in the worker and I'm not giving it to the master node :
case class Worker
(
awsBucket : String,
gapMinValueMicroSec : Long,
replyTo : ActorRef,
mongoURI : String,
mongoDBName : String,
mongoCollectioName : String
) extends Actor with ActorLogging {
// redis cache here now
val redisCache = ...
But with such design, every routee will create a new instance of the redis cache and it's not the expected behaviour. What I want is to have one instance of my redis cache and then share it with all my routees but in a context of a cluster application, it seems to not be possible so I don't know if it's a design failure or some missing experience with Akka. If anyone met similar issues, I take advices with pleasure !