16
votes

What's the best way to have an actor sleep? I have actors set up as agents which want to maintain different parts of a database (including getting data from external sources). For a number of reasons (including not overloading the database or communications and general load issues), I want the actors to sleep between each operation. I'm looking at something like 10 actor objects.

The actors will run pretty much infinitely, as there will always be new data coming in, or sitting in a table waiting to be propagated to other parts of the database etc. The idea is for the database to be as complete as possible at any point in time.

I could do this with an infinite loop, and a sleep at the end of each loop, but according to http://www.scala-lang.org/node/242 actors use a thread pool which is expanded whenever all threads are blocked. So I imagine a Thread.sleep in each actor would be a bad idea as would waste threads unnecessarily.

I could perhaps have a central actor with its own loop that sends messages to subscribers on a clock (like async event clock observers)?

Has anyone done anything similar or have any suggestions? Sorry for extra (perhaps superfluous) information.

Cheers

Joe

4

4 Answers

17
votes

There's no need to explicitly cause an actor to sleep: using loop and react for each actor means that the underlying thread pool will have waiting threads whilst there are no messages for the actors to process.

In the case that you want to schedule events for your actors to process, this is pretty easy using a single-threaded scheduler from the java.util.concurrent utilities:

object Scheduler {
  import java.util.concurrent.Executors
  import scala.compat.Platform
  import java.util.concurrent.TimeUnit
  private lazy val sched = Executors.newSingleThreadScheduledExecutor();
  def schedule(f: => Unit, time: Long) {
    sched.schedule(new Runnable {
      def run = f
    }, time , TimeUnit.MILLISECONDS);
  }
}

You could extend this to take periodic tasks and it might be used thus:

val execTime = //...  
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)

Your target actor will then simply need to implement an appropriate react loop to process the given message. There is no need for you to have any actor sleep.

21
votes

There was a good point to Erlang in the first answer, but it seems disappeared. You can do the same Erlang-like trick with Scala actors easily. E.g. let's create a scheduler that does not use threads:

import actors.{Actor,TIMEOUT}

def scheduler(time: Long)(f: => Unit) = {
  def fixedRateLoop {
    Actor.reactWithin(time) {
      case TIMEOUT => f; fixedRateLoop
      case 'stop => 
    }
  }
  Actor.actor(fixedRateLoop)
}

And let's test it (I did it right in Scala REPL) using a test client actor:

case class Ping(t: Long)

import Actor._
val test = actor { loop {
  receiveWithin(3000) {
    case Ping(t) => println(t/1000)
    case TIMEOUT => println("TIMEOUT")
    case 'stop => exit
  }
} }

Run the scheduler:

import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }

and you will see something like this

scala> 1249383399
1249383401
1249383403
1249383405
1249383407

which means our scheduler sends a message every 2 seconds as expected. Let's stop the scheduler:

sched ! 'stop

the test client will begin to report timeouts:

scala> TIMEOUT
TIMEOUT
TIMEOUT

stop it as well:

test ! 'stop
4
votes

ActorPing (Apache License) from lift-util has schedule and scheduleAtFixedRate Source: ActorPing.scala

From scaladoc:

The ActorPing object schedules an actor to be ping-ed with a given message at specific intervals. The schedule methods return a ScheduledFuture object which can be cancelled if necessary

2
votes

There unfortunately are two errors in the answer of oxbow_lakes.

One is a simple declaration mistake (long time vs time: Long), but the second is some more subtle.

oxbow_lakes declares run as

def run = actors.Scheduler.execute(f) 

This however leads to messages disappearing from time to time. That is: they are scheduled but get never send. Declaring run as

def run = f

fixed it for me. It's done the exact way in the ActorPing of lift-util.

The whole scheduler code becomes:

object Scheduler {
    private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
    def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
    }
}

I tried to edit oxbow_lakes post, but could not save it (broken?), not do I have rights to comment, yet. Therefore a new post.