0
votes

I am currently trying to modify the source code of simple DistBelief framework implemented using Akka Actors. The original source code is here: http://alexminnaar.com/implementing-the-distbelief-deep-neural-network-training-framework-with-akka.html . Original implementation is based on just Akka Actors, but I want to extend it to distributed mode. I think Akka-Cluster-Sharding is the correct option for this task. But I am wondering where to properly handle incoming messages, in receive() method, or in extractShardId() & extractEntityId() in an actor class (for example for ParameterShard Actor, you can see full source code in the above given link). Akka's offical docs say: *The extractEntityId and extractShardId are two application specific functions to extract the entity identifier and the shard identifier from incoming messages.

object ParameterShard {
  case class ParameterRequest(dataShardId: Int, layerId: Int)
  case class LatestParameters(weights: DenseMatrix[Double])
}

class ParamServer(shardId: Int,
                  numberOfShards: Int,
                  learningRate: Double,
                  initialWeight: LayerWeight) extends Actor with ActorLogging {


  val shardName: String = "ParamServer"

  val extractEntityId: ShardRegion.ExtractEntityId = {
      //case ps: ParameterRequest => (ps.dataShardId.toString, ps)

  }

  val extractShardId: ShardRegion.ExtractShardId = {
      //case ps: ParameterRequest => (ps.dataShardId % numberOfShards).toString
  }
  //weights initialize randomly
  var latestParameter: LayerWeight = initialWeight

  def receive = {

    //A layer corresponding to this shardId in some model replica has requested the latest version of the parameters.
    case ParameterRequest(shardId, layerId) => {
      log.info(s"layer ${layerId} weights read by model replica ${shardId}")
      context.sender() ! LatestParameters(latestParameter)
    }

    /*
    A layer corresponding to this shardId in some model replica has computed a gradient, so we must update our
    parameters according to this gradient.
    */
    case Gradient(g, replicaId, layerId) => {
      log.info(s"layer ${layerId} weights updated by model replica ${replicaId}")
      latestParameter = latestParameter + g.t * learningRate
    }

  }

}
1
extractShardId is called to extract information to decide with shard is responsible from processing that message so route that message to corresponding shard. And extractEntityId is called to decide which actor will process that message. So you just need to implement your application specific logic according to that and akka will handle itMustafa Simav
@MustafaSimav can you give some example with both extractShardId & extractEntityId and receive() present in the example.Humoyun Ahmad

1 Answers

1
votes

I don't know anything about DistBelief so I can't identify which actor is identified by what. So, I am sorry I can't provide an example implementation. But lets make up an example: you want to fetch /example url from different domains. So your actor looks like

class FetchActor(domain: String) extends Actor {
  def receive = {
    case Fetch => // do the job
  }
}

object FetchActor {
  case object Fetch
}

In order to use cluster sharding with that actor, refactor like this

class FetchActor extends Actor {
  def receive = {
    case Fetch(domain) => // do the job
  }
}

object FetchActor {
  case class Fetch(domain: String) // move parameter to message
  val entityIdExtractor = {
    case msg @ Fetch(domain) => (domain, msg)   // Which message FetchActor process
  }
  val shardIdExtractor = {
    case Fetch(domain) => domain   // What is shard id
  }
}

I decided to use whole domain as shardIdExtractor in that example but your application may require top level domain.

And thats enough. Just start FetchActor with cluster sharding and akka will handle distribution of it automatically.