1
votes

I am trying to parallelise a code using scala actors. That is my first real code with actors, but I have some experience with Java Mulithreading and MPI in C. However I am completely lost.

The workflow I want to realise is a circular pipeline and can be described as the following:

  • Each worker actor has a reference to another one, thus forming a circle
  • There is a coordinator actor which can trigger a computation by sending a StartWork() message
  • When a worker receives a StartWork() message, it process some stuff locally and sends DoWork(...) message to its neighbour in the circle.
  • The neighbours do some other stuff and sends in turn a DoWork(...) message to its own neighbour.
  • This continues until the initial worker receives a DoWork() message.
  • The coordinator can send a GetResult() message to the initial worker and wait for a reply.

The point is that the coordinator should only receive a result when data is ready. How can a worker wait that the job returned to it before answering the GetResult() message ?

To speed up computation, any worker can receive a StartWork() at any time.

Here is my first try pseudo-implementation of the worker:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
    case GetResult() => reply( ready )
  }
}

On the coordinator side:

worker ! StartWork()
val result = worker !? GetResult() // should wait
2

2 Answers

3
votes

Firstly, you clearly need to have some identifier of what constitutes a single piece of work, so that the GetResult can get the correct result. I guess the obvious solution is to have your actors keep a Map of the results and a Map of any waiting getters:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var res: Map[Long, Result] = Map.empty
   var gets: Map[Long, OutputChannel[Any]] = Map.empty   
   def act() {
     ...
     case DoWork( id, resultData, remaining ) if remaining == 0 =>
       res += (id -> resultData)
       gets.get(id).foreach(_ ! res(id)) //reply to getters when result is ready
       gets -= id //clear out getter map now?
     case GetResult(id) if res.isDefinedAt(d) => //result is ready
       reply (res(id))
     case GetResult(id) => //no result ready 
       gets += (id -> sender)
   }
}

Note: the use of if in the matching condition can make message processing a bit clearer

1
votes

One alternative would be this:

class Worker( neighbor: Worker, numWorkers: Int ) {
   var ready = Foo()
   def act() {
     case StartWork() => { 
       val someData = doStuff()
       neighbor ! DoWork( someData, numWorkers-1 ) 
       }
     case DoWork( resultData, remaining ) => if( remaining == 0 ) {
         ready = resultData
         react {
           case GetResult() => reply( ready )
         }
       } else {
         val someOtherData = doOtherStuff( resultData )
         neighbor ! DoWork( someOtherData, remaining-1 )
      }
  }
}

After the work has finished, this worker will be stuck until it receives the GetResult message. On the other hand, the coordinator can immediately send the GetResult, as it will remain in the mailbox until the worker receives it.