6
votes

Suppose I need to process files in a given folder in parallel. In Java I would create a FolderReader thread to read file names from the folder and a pool of FileProcessor threads. FolderReader reads file names and submits the file processing function (Runnable) to the pool executor.

In Scala I see two options:

  • create a pool of FileProcessor actors and schedule a file processing function with Actors.Scheduler.
  • create an actor for each file name while reading the file names.

Does it make sense? What is the best option?

6
Concurrent harddrive access is not good. It causes extra magnetic head movements. Don't do that.xiefei
Actors is the wrong thing to use here.Daniel C. Sobral
@DanielC.Sobral Thanks. Could you explain why and what to use instead?Michael
@Michael Actors in Scala don't work well with blocking tasks, such as the one described. Some of the answers below provide alternatives.Daniel C. Sobral
Unless your files are on different devices or your processing effort is very CPU bound, processing files in parallel is unlikely to be useful. Also, you use the word "concurrent" but I think you mean "parallel." Actors are for concurrency, not parallelism. Futures and parallel data structures are for parallelism.James Iry

6 Answers

3
votes

I suggest with all my energies to keep as far as you can from the threads. Luckily we have better abstractions which take care of what's happening below, and in your case it appears to me that you do not need to use actors (while you can) but you can use a simpler abstraction, called Futures. They are a part of Akka open source library, and I think in the future will be a part of the Scala standard library as well.

A Future[T] is simply something that will return a T in the future.

All you need to run a future, is to have an implicit ExecutionContext, which you can derive from a java executor service. Then you will be able to enjoy the elegant API and the fact that a future is a monad to transform collections into collections of futures, collect the result and so on. I suggest you to give a look to http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

There's a lot going on here:

  • I am using Future.traverse which receives as a first parameter which is M[T]<:Traversable[T] and as second parameter a T => Future[T] or if you prefer a Function1[T,Future[T]] and returns Future[M[T]]
  • I am using the Future.apply method to create an anonymous class of type Future[T]

There are many other reasons to look at Akka futures.

  • Futures can be mapped because they are monad, i.e. you can chain Futures execution :

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • Futures have callback: future.onComplete, onSuccess, onFailure, andThen etc.

  • Futures support not only traverse, but also for comprehension

10
votes

Depending on what you're doing, it may be as simple as

for(file<-files.par){
   //process the file
}
2
votes

Ideally you should use two actors. One for reading the list of files, and one for actually reading the file.

You start the process by simply sending a single "start" message to the first actor. The actor can then read the list of files, and send a message to the second actor. The second actor then reads the file and processes the contents.

Having multiple actors, which might seem complicated, is actually a good thing in the sense that you have a bunch of objects communicating with eachother, like in a theoretical OO system.

Edit: you REALLY shouldn't be doing doing concurrent reading of a single file.

2
votes

I was going to write up exactly what @Edmondo1984 did except he beat me to it. :) I second his suggestion in a big way. I'll also suggest that you read the documentation for Akka 2.0.2. As well, I'll give you a slightly more concrete example:

import akka.dispatch.{ExecutionContext, Future, Await}
import akka.util.duration._
import java.util.concurrent.Executors
import java.io.File

val execService = Executors.newCachedThreadPool()
implicit val execContext = ExecutionContext.fromExecutorService(execService)

val tmp = new File("/tmp/")
val files = tmp.listFiles()
val workers = files.map { f =>
  Future {
    f.getAbsolutePath()
  }
}.toSeq
val result = Future.sequence(workers)
result.onSuccess {
  case filenames =>
    filenames.foreach { fn =>
      println(fn)
    }
}

// Artificial just to make things work for the example
Thread.sleep(100)
execContext.shutdown()

Here I use sequence instead of traverse, but the difference is going to depend on your needs.

Go with the Future, my friend; the Actor is just a more painful approach in this instance.


2
votes

But if use actors, what's wrong with that?

If we have to read / write to some property file. There is my Java example. But still with Akka Actors.

Lest's say we have an actor ActorFile represents one file. Hm.. Probably it can not represent One file. Right? (would be nice it could). So then it represents several files like PropertyFilesActor then:

Why would not use something like this:

public class PropertyFilesActor extends UntypedActor {

    Map<String, String> filesContent = new LinkedHashMap<String, String>();

    { // here we should use real files of cource
        filesContent.put("file1.xml", "");
        filesContent.put("file2.xml", "");
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof WriteMessage)  {
            WriteMessage writeMessage = (WriteMessage) message;
            String content = filesContent.get(writeMessage.fileName);
            String newContent = content + writeMessage.stringToWrite;
            filesContent.put(writeMessage.fileName, newContent);
        }

        else if (message instanceof ReadMessage) {
            ReadMessage readMessage = (ReadMessage) message;
            String currentContent = filesContent.get(readMessage.fileName);
            // Send the current content back to the sender
            getSender().tell(new ReadMessage(readMessage.fileName, currentContent), getSelf());
        }

        else unhandled(message);

    }

}

...a message will go with parameter (fileName)

It has its own in-box, accepting messages like:

  1. WriteLine(fileName, string)
  2. ReadLine(fileName, string)

Those messages will be storing into to the in-box in the order, one after antoher. The actor would do its work by receiving messages from the box - storing/reading, and meanwhile sending feedback sender ! message back.

Thus, let's say if we write to the property file, and send showing the content on the web page. We can start showing page (right after we sent message to store a data to the file) and as soon as we received the feedback, update part of the page with a data from just updated file (by ajax).

1
votes

Well, grab your files and stick them in a parallel structure

scala> new java.io.File("/tmp").listFiles.par
res0: scala.collection.parallel.mutable.ParArray[java.io.File] = ParArray( ... )

Then...

scala> res0 map (_.length)
res1: scala.collection.parallel.mutable.ParArray[Long] = ParArray(4943, 1960, 4208, 103266, 363 ... )