1
votes

This might be one for the philosophers... (or @Steve Weston or @Martin Morgan)

I've been having some issues with memory leaks when using parLapply, and after digging through enough threads on the matter, I think this question is well warranted. I've taken some time to try and figure this one out, and while I've got an inkling of a clue as to why the observed behavior happens, I'm lost as to how to resolve it.

Consider the following as a sourced script, saved as: parallel_question.R

rf.parallel<-function(n=10){
  library(parallel)
  library(randomForest)

  rf.form<- as.formula(paste("Final", paste(c('x','y','z'), collapse = "+"), sep = " ~ "))

  rf.df<-data.frame(Final=runif(10000),y=runif(10000),x=runif(10000),z=runif(10000))

  rf.df.list<-split(rf.df,rep(1:n,nrow(rf.df))[1:nrow(rf.df)])

  cl<-makeCluster(n)
  rf.list<-parLapply(cl,rf.df.list,function(x,rf.form,n){
    randomForest::randomForest(rf.form,x,ntree=100,nodesize=10, norm.votes=FALSE)},rf.form,n)
  stopCluster(cl)

  return(rf.list)
  }

We source and run the script with:

scrip.loc<-"G:\\Scripts_Library\\R\\Stack_Answers\\parallel_question.R"

source(scrip.loc)

rf.parallel(n=10)

Fairly straight forward... we ran several random forest in parallel. Seems to be memory efficient. We could combine them later, or do something else. Handy. Nice. Well behaved.

Now consider the following script, saved as parallel_question_2.R

rf.parallel_2<-function(n=10){
  library(parallel)
  library(magrittr)
  library(randomForest)

  rf.form<- as.formula(paste("Final", paste(c('x','y','z'), collapse = "+"), sep = " ~ "))

  rf.df<-data.frame(Final=runif(10000),y=runif(10000),x=runif(10000),z=runif(10000))

  large.list<-rep(rf.df,10000)

  rf.df.list<-split(rf.df,rep(1:n,nrow(rf.df))[1:nrow(rf.df)])

  cl<-makeCluster(n)
  rf.list<-parLapply(cl,rf.df.list,function(x,rf.form,n){

    randomForest::randomForest(rf.form,x,ntree=100,nodesize=10, norm.votes=FALSE)},rf.form,n)

  stopCluster(cl)

  return(rf.list)
}

In this second script, we've got a large list in our sourced environment. We are not calling the list or bringing it into our parallel function. I've set the size of the list to probably be a problem on at least a 32gb machine.

scrip.loc<-"G:\\Scripts_Library\\R\\Stack_Answers\\parallel_question_2.R"

source(scrip.loc)

rf.parallel_2(n=10)

When we run the second script, we end up carrying around ~3gb (the size of our large list) * the number of worker threads set to the cluster, additional material around. If we run the contents of the second script in a non-sourced environment, this is not the behavior; rather, we get one ~3gb list, the parallelized function runs without issue, and thats the end of it.

So.. how/why are the worker environments taking uneccessary variables elements from the parent environment? Why does it only happen in sourced scripts? How can I mitigate for this when I have a sourced, large and complex script, which has sub-sections which are parallelized (but may have 3-10gb of intermediate data being carried around)?

Relevant or similar threads:

Using parLapply and clusterExport inside a function

clusterExport, environment and variable scoping

2

2 Answers

7
votes

The signature of parLapply(cl, X, FUN, ...) applies FUN to each element of X. The worker needs to know FUN, so this is serialized and sent to the worker. What is an R function? It's the code that defines the function, and the environment in which the function was defined. Why the environment? because in R it's legal to reference variables defined outside of FUN, e.g.,

f = function(y) x + y
x = 1; f(1)
## [1] 2

As a second complexity, R allows the function to update variables outside the function

f = function(y) { x <<- x + 1; x + y }
x = 1; f(1)
## [1] 3

In the above, we can imagine that we could figure out which parts of the environment of f() need to be seen (only the variable x), but in general this kind of analysis is not possible without actually evaluating the function, e.g., f = function(y, name) get(name) + y; x = 1; f(1, "x")

So for FUN to be evaluated on the worker, the worker needs to know both the definition of FUN and the content of the environment FUN was defined in. R lets the worker know about FUN by using serialize(). The consequence is easy to see

f = function(n) { x = sample(n); length(serialize(function() {}, NULL)) }
f(1)
## [1] 754
f(10)
## [1] 1064
f(100)
## [1] 1424

Larger objects in the environment result in more information sent to / used by the worker.

If you think about it, the description so far would mean that the entire R session should be serialized to the worker (or to disk, if serialize() were being used to save objects) -- the environment of the implicit function in f() includes the body of f(), but also the environment of f(), which is the global environment, and the environment of the global environment, which is the search path... (check out environment(f) and parent.env(.GlobalEnv)). R has an arbitrary rule that it stops at the global environment. So instead of using an implicit function() {}, define this in the .GlobalEnv

g = function() {}
f = function(n) { x = sample(n); length(serialize(g, NULL)) }
f(1)
## [1] 592
f(1000)
## [1] 592

Note also that this has consequences for what functions can be serialized. For instance if g() were serialized in the code below it would 'know' about x

f = function(y) { x = 1; g = function(y) x + y; g() }
f(1)
## [1] 2

but here it does not -- it knows about the symbols in the environment(s) it was defined in but not about the symbols in the environment it was called from.

rm(x)
g = function(y) x + y
f = function(y) { x = 1; g() }
f()
## Error in g() : object 'x' not found

In your script, you could compare

cl = makeCluster(2)
f = function(n) {
    x = sample(n)
    parLapply(
        cl, 1,
        function(...)
            length(serialize(environment(), NULL))
    )
}
f(1)[[1]]
## [1] 256
f(1000)[[1]]
## [1] 4252

with

g = function(...) length(serialize(environment(), NULL))
f = function(n) {
    x = sample(n)
    parLapply(cl, 1, g)
}
f(1)[[1]]
## [1] 150
f(1000)[[1]]
## [1] 150
0
votes

Towards the end of processing I was passing close to 50 GBs of data back into the parLapply, which was not... ideal.

I ended up creating a new function that called the parLapply. I placed it inside my nested loop, created a new environment there, set the parent environment to the .GlobalEnv, passed only variables needed to the new environment, and then passed that environment to clusterExport.

For details on environments, I'd recommend this blog post. Also, I found the Parallel R book by Ethan McCallum and Stephen Weston to be helpful. On pages 15-17, there is a discussion on this issue from the 'snow' package.