4
votes

I'm currently developing an R package that will be using parallel computing to solve some tasks, through means of the "parallel" package.

I'm getting some really awkward behavior when utilizing clusters defined inside functions of my package, where the parLapply function assigns a job to a worker and waits for it to finish to assign a job to next worker. Or at least this is what appears to be happening, through the observation of the log file "cluster.log" and the list of running processes in the unix shell.

Below is a mockup version of the original function declared inside my package:

.parSolver <- function( varMatrix, var1 ) {

    no_cores <- detectCores()

    #Rows in varMatrix
    rows <- 1:nrow(varMatrix[,])

    # Split rows in n parts
    n <- no_cores
    parts <- split(rows, cut(rows, n))

    # Initiate cluster
    cl <- makePSOCKcluster(no_cores, methods = FALSE, outfile = "/home/cluster.log")
    clusterEvalQ(cl, library(raster))
    clusterExport(cl, "varMatrix", envir=environment())
    clusterExport(cl, "var1", envir=environment())


    rParts <- parLapply(cl = cl, X = 1:n, fun = function(x){
        part <- rasterize(varMatrix[parts[[x]],], raster(var1), .....)
        print(x)
        return(part)
        })

    do.call(merge, rParts)
}

NOTES:

  • I'm using makePSOCKcluster because i want the code to run on windows and unix systems alike although this particular problem is only manifesting itself in a unix system.
  • Functions rasterize and raster are defined in library(raster), exported to the cluster.

The weird part to me is if I execute the exact same code of the function parSolver in a global environment every thing works smoothly, all workers take one job at the same time and the task completes in no time. However if I do something like:

library(myPackage)

varMatrix <- (...)
var1 <- (...)
result <- parSolver(varMatrix, var1)

the described problem appears.

It appears to be a load balancing problem however that does not explain why it works ok in one situation and not in the other.

Am I missing something here? Thanks in advance.

1
Usually when I use a parallel cluster in a function, I use to declare the cluster in the global environment. And I never had problems using parLapply after that. I cannot tell you that it will solve your problem, but you can try. Initiate your cluster outside the function and add a cl parameter in your parSolver(varMatrix, var1, cl) function, and use n <- length(cl) inside your functionSébastien Rochette
I had already tried that approach but unfortunately the problem remains...1cgmr

1 Answers

3
votes

I don't think parLapply is running sequentially. More likely, it's just running inefficiently, making it appear to run sequentially.

I have a few suggestions to improve it:

  • Don't define the worker function inside parSolver
  • Don't export all of varMatrix to each worker
  • Create the cluster outside of parSolver

The first point is important, because as your example now stands, all of the variables defined in parSolver will be serialized along with the anonymous worker function and sent to the workers by parLapply. By defining the worker function outside of any function, the serialization won't capture any unwanted variables.

The second point avoids unnecessary socket I/O and uses less memory, making the code more scalable.

Here's a fake, but self-contained example that is similar to yours that demonstrates my suggestions:

# Define worker function outside of any function to avoid
# serialization problems (such as unexpected variable capture)
workerfn <- function(mat, var1) {
    library(raster)
    mat * var1
}

parSolver <- function(cl, varMatrix, var1) {
    parts <- splitIndices(nrow(varMatrix), length(cl))
    varMatrixParts <- lapply(parts, function(i) varMatrix[i,,drop=FALSE])
    rParts <- clusterApply(cl, varMatrixParts, workerfn, var1)
    do.call(rbind, rParts)
}

library(parallel)
cl <- makePSOCKcluster(3)
r <- parSolver(cl, matrix(1:20, 10, 2), 2)
print(r)

Note that this takes advantage of the clusterApply function to iterate over a list of row-chunks of varMatrix so that the entire matrix doesn't need to be sent to everyone. It also avoids calls to clusterEvalQ and clusterExport, simplifying the code, as well as making it a bit more efficient.