0
votes

I've a parent -> child actor relationship to upload files to Dropbox. The relation consists of a supervisor actor and an upload actor. The supervisor actor defines a supervisor strategy for the upload actor. So if an upload to Dropbox fails the actor should be restarted as long the max number of retries was reached. In my application I'm interested in the status of the upload. So I use the ask pattern to receive a future in success or failure case. Below you can find the current implementation of my actors.

/**
 * An upload message.
 *
 * @param byte The byte array representing the content of a file.
 * @param path The path under which the file should be stored.
 */
case class UploadMsg(byte: Array[Byte], path: String)

/**
 * The upload supervisor.
 */
class UploadSupervisor extends Actor {

  /**
   * Stores the sender to the executing actor.
   */
  val senders: ParHashMap[String, ActorRef] = ParHashMap()

  /**
   * Defines the supervisor strategy
   */
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: DbxException => Restart
    case e: Exception => Stop
  }

  /**
   * Handles the received messages.
   */
  override def receive: Actor.Receive = {
    case msg: UploadMsg =>
      implicit val timeout = Timeout(60.seconds)

      val actor = context.actorOf(PropsContext.get(classOf[UploadActor]))
      senders += actor.path.toString -> sender
      context.watch(actor)
      ask(actor, msg).mapTo[Unit] pipeTo sender

    case Terminated(a) =>
      context.unwatch(a)
      senders.get(a.path.toString).map { sender =>
        sender ! akka.actor.Status.Failure(new Exception("Actor terminated"))
        senders - a.path.toString
      }
  }
}

/**
 * An aktor which uploads a file to Dropbox.
 */
class UploadActor @Inject() (client: DropboxClient) extends Actor {

  /**
   * Sends the message again after restart.
   *
   * @param reason The reason why an restart occurred.
   * @param message The message which causes the restart.
   */
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    message foreach { self forward }
  }

  /**
   * Handles the received messages.
   */
  override def receive: Receive = {
    case UploadMsg(byte, path) =>
      val encrypted = encryptor.encrypt(byte)
      val is = new ByteArrayInputStream(encrypted)
      try {
        client.storeFile("/" + path, DbxWriteMode.force(), encrypted.length, is)
        sender ! (())
      } finally {
        is.close()
      }
  }
}

My question is now: is there a better solution to tell my application that an upload actor failed after the specified number or retries. Especially the map to store the senders for the actors feels a bit awkward.

1

1 Answers

1
votes

You should use CircuitBreaker

val breaker =
 new CircuitBreaker(context.system.scheduler,
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute)

and then wrap you messages by breaker:

sender() ! breaker.withSyncCircuitBreaker(dangerousCall)

Breaker has three states: Closed, Open and HalfOpen. Normal state is Closed, when message failure $maxFailures times state is changed to Open. Breaker provides callbacks for state changes. If you want do something use it. for instance:

breaker onOpen { sender ! FailureMessage()}