I've a parent -> child actor relationship to upload files to Dropbox. The relation consists of a supervisor actor and an upload actor. The supervisor actor defines a supervisor strategy for the upload actor. So if an upload to Dropbox fails the actor should be restarted as long the max number of retries was reached. In my application I'm interested in the status of the upload. So I use the ask pattern to receive a future in success or failure case. Below you can find the current implementation of my actors.
/**
* An upload message.
*
* @param byte The byte array representing the content of a file.
* @param path The path under which the file should be stored.
*/
case class UploadMsg(byte: Array[Byte], path: String)
/**
* The upload supervisor.
*/
class UploadSupervisor extends Actor {
/**
* Stores the sender to the executing actor.
*/
val senders: ParHashMap[String, ActorRef] = ParHashMap()
/**
* Defines the supervisor strategy
*/
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: DbxException => Restart
case e: Exception => Stop
}
/**
* Handles the received messages.
*/
override def receive: Actor.Receive = {
case msg: UploadMsg =>
implicit val timeout = Timeout(60.seconds)
val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
senders += actor.path.toString -> sender
context.watch(actor)
ask(actor, msg).mapTo[Unit] pipeTo sender
case Terminated(a) =>
context.unwatch(a)
senders.get(a.path.toString).map { sender =>
sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
senders - a.path.toString
}
}
}
/**
* An aktor which uploads a file to Dropbox.
*/
class UploadActor @Inject() (client: DropboxClient) extends Actor {
/**
* Sends the message again after restart.
*
* @param reason The reason why an restart occurred.
* @param message The message which causes the restart.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
message foreach { self forward }
}
/**
* Handles the received messages.
*/
override def receive: Receive = {
case UploadMsg(byte, path) =>
val encrypted = encryptor.encrypt(byte)
val is = new ByteArrayInputStream(encrypted)
try {
client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
sender ! (())
} finally {
is.close()
}
}
}
My question is now: is there a better solution to tell my application that an upload actor failed after the specified number or retries. Especially the map to store the senders for the actors feels a bit awkward.