2
votes

I am spawning a small number of actors to fetch, process and save RSS feed items to a database. This is done through a main method of an object running on cron. I create these actors and dole out jobs to them as they complete the previous job assigned to them. My main class spawns a single actor, the one that doles out jobs to a pool of actors. Eventually the main method seems to hang. It doesn't exit, but execution halts on all the actors. My CTO believes the main is exiting before the actors complete their work and leaving them, but I am not convinced that's the case. I receive no success exit on main (no exit at all).

Essentially I'm wondering how to debug these actors, and what possible reason could cause this to happen. Will main exit before actors have completed their execution (and if it does, does that matter?) From what I can tell actors using receive are mapped 1-to-1 to threads, correct? Code is below. Please ask any follow-up questions, help is greatly appreciated. I know I may not have provided sufficient detail, I'm new to scala and actors and will update as needed.

object ActorTester {
  val poolSize = 10
  var pendingQueue :Set[RssFeed] = RssFeed.pendingQueue

  def main(args :Array[String]) {
    val manager = new SpinnerManager(poolSize, pendingQueue)
    manager.start
  }
}

case object Stop

class SpinnerManager(poolSize :Int = 1, var pendingQueue :Set[RssFeed]) extends Actor {
  val pool = new Array[Spinner](poolSize)

  override def start() :Actor = {
    for (i <- 0 to (poolSize - 1)) {
      val spinner = new Spinner(i)
      spinner.start()
      pool(i) = spinner
    }
    super.start
  }

  def act() {
    for {
      s <- pool
      if (!pendingQueue.isEmpty)
     } {
       s ! pendingQueue.head
       pendingQueue = pendingQueue.tail
     }

    while(true) {
      receive {
        case id :Int => {
          if (!pendingQueue.isEmpty) {
            pool(id) ! pendingQueue.head
            pendingQueue = pendingQueue.tail             
          } else if ((true /: pool) { (done, s) => {
            if (s.getState != Actor.State.Runnable) {
              val exited = future {
                s ! Stop
                done && true
              }
              exited()
            } else {
              done && false
            }
          }}) {
            exit
          }
        } 
      }
    }
  }
}

class Spinner(id :Int) extends Actor {
  def act() {
    while(true) {
      receive {
        case dbFeed :RssFeed => {
          //process rss feed
          //this has multiple network requests, to the original blogs, bing image api
          //our instance of solr - some of these spawn their own actors
          sender ! id
        }
        case Stop => exit
      }
    }
  }
}
2

2 Answers

2
votes

For one thing you're making a tiny but important mistake when you're folding left in order to determine whether all Spinner actors have "terminated" or not. What you should do is evaluate to done && true resp. done && false at the end of the if cases, but currently you just say true resp. false without respect to done.

For example, imagine having 4 Spinner actors where the first and second ones were Runnable, the third one not, and the fourth one Runnable again. In that case the result of your foldleft would be true in spite of the fact that the third actor hasn't finished yet. If you were using a logical &&, you'd get the correct result.

This is possibily also what causes your application to hang.

EDIT: There also was an issue wrt a race condition. The following code works now, hope it helps. Anyway, I was wondering, doesn't Scala's actor implementation automatically make use of worker threads?

import actors.Actor
import scala.collection.mutable.Queue

case class RssFeed()

case class Stop()

class Spinner(id: Int) extends Actor {
  def act() {
    loop {
      react {
        case dbFeed: RssFeed => {
          // Process RSS feed
          sender ! id
        }
        case Stop => exit()
      }
    }
  }
}

class SpinnerManager(poolSize: Int, pendingQueue: Queue[RssFeed]) extends Actor {
  val pool = Array.tabulate(poolSize)(new Spinner(_).start())

  def act() {
    for (s <- pool; if (!pendingQueue.isEmpty)) {
      pendingQueue.synchronized {
        s ! pendingQueue.dequeue()
      }
    }

    loop {
      react {
        case id: Int => pendingQueue.synchronized {
          if (!pendingQueue.isEmpty) {
            Console println id
            pool(id) ! pendingQueue.dequeue()
          } else {
            if (pool forall (_.getState != Actor.State.Runnable)) {
              pool foreach (_ ! Stop)
              exit()
            }
          }
        }
      }
    }
  }

}

object ActorTester {
  def main(args: Array[String]) {
    val poolSize = 10
    val pendingQueue: Queue[RssFeed] = Queue.tabulate(100)(_ => RssFeed())
    new SpinnerManager(poolSize, pendingQueue).start()
  }
}
0
votes

So after several days of debugging I've solved this issue. fotNelton's code suggestions were very helpful in doing so, so I've given him a vote. However, they didn't address the problem itself. What I've found is that if you are running this in a main method then if the parent actors exit before their child actors then the program will hang forever and never exit, still holding all of its memory. In the process of handling the RSS feed, a Fetcher would spawn actors and send them messages to do things involving network requests. These actors need to complete their work before the parent actor quits. Fetcher wouldn't wait for these actors to finish though, once he sent the message he would just move on. So he would tell manager he was finished before his child actors had finished all their work. To deal with this, one option would be to use futures and wait until the actors are done (pretty slow). My solution was to create services accessible via URL (POST to a service that has an actor waiting to react). The service would respond right away, and send a message to its own actor. Thus the actors can quit once they send the request to the service, and don't need to spawn any other actors.

object FeedFetcher {
  val poolSize = 10
  var pendingQueue :Queue[RssFeed] = RssFeed.pendingQueue

  def main(args :Array[String]) {
    new FetcherManager(poolSize, pendingQueue).start
  }
}

case object Stop

class FetcherManager(poolSize :Int = 1, var pendingQueue :Queue[RssFeed]) extends Actor {
  val pool = new Array[Fetcher](poolSize)
  var numberProcessed = 0

  override def start() :Actor = {
    for (i <- 0 to (poolSize - 1)) {
      val fetcher = new Fetcher(i)
      fetcher.start()
      pool(i) = fetcher
    }
    super.start
  }

  def act() {
    for {
      f <- pool
      if (!pendingQueue.isEmpty)
     } {
      pendingQueue.synchronized {
        f ! pendingQueue.dequeue
      }
    }

    loop {
      reactWithin(10000L) {
        case id :Int => pendingQueue.synchronized {
          numberProcessed = numberProcessed + 1
          if (!pendingQueue.isEmpty) {
            pool(id) ! pendingQueue.dequeue             
          } else if ((true /: pool) { (done, f) => {
            if (f.getState == Actor.State.Suspended) {
              f ! Stop
              done && true
            } else if (f.getState == Actor.State.Terminated) {
              done && true
            } else {
              false
            }
          }}) {
            pool foreach { f => {
              println(f.getState)
            }}
            println("Processed " + numberProcessed + " feeds total.")
            exit
          }
        }
        case TIMEOUT => {
          if (pendingQueue.isEmpty) {
            println("Manager just woke up from timeout with all feeds assigned.")
            pool foreach { f => {
              if (f.getState == Actor.State.Suspended) {
                println("Sending Stop to Fetcher " + f.id)
                f ! Stop
              }
            }}
            println("Checking state of all Fetchers for termination.")
            if ((true /: pool) { (done, f) => {
              done && (f.getState == Actor.State.Terminated)
            }}) {
              exit
            }
          }
        }
      }
    }
  }
}

class Fetcher(val id :Int) extends Actor {
  var feedsIveDone = 0
  def act() {
    loop {
      react {
        case dbFeed :RssFeed => {
          println("Fetcher " + id + " starting feed")
          //process rss feed here
          feedsIveDone = feedsIveDone + 1
          sender ! id
        }
        case Stop => {
          println(id + " exiting")
          println(feedsIveDone)
          exit
        }
      }
    }
  }