1
votes

I am trying to figure out how i could use any of the parallel processing packages like foreach or doParallel in this random forest loop i have created:

  ModelInfo <- data.frame ( model=as.numeric()
                     ,Nodesize=as.numeric()
                     ,Mrty=as.numeric()
                     ,Maxdepth=as.numeric()
                     ,Cp=as.numeric()
                     ,Accuracy_Training=as.numeric()
                     ,AUC_Training=as.numeric())

               w=1

               set.seed(1809)
              NumberOfSamples=1

              # Number of iterations
              rfPred=list()
              pred=list()
              roundpred=list()
              cTab=list()
              Acc=list()
              pred.to.roc=list()
              pred.rocr=list()
              perf.rocr=list()
              AUC=list()
              Var_imp=list()

            rf_model_tr = list()
            length(rf_model_tr) <- NumberOfSamples
            for (i in 1:NumberOfSamples)
            {

            rf_model_tr[[i]] = list()
            rfPred[[i]]=list()
            pred[[i]]=list()
            roundpred[[i]]=list()
            cTab[[i]]=list()
            Acc[[i]]=list()
            pred.to.roc[[i]]=list()
            pred.rocr[[i]]=list()
            perf.rocr[[i]]=list()
            AUC[[i]]=list()
            Var_imp[[i]]=list()

            ## Tune nodesize
            nodesize =c(10,20,50,80,100,200)
            n=length(nodesize)
            length(rf_model_tr[[i]]) <- n
            for ( j in 1: length (nodesize))
            {

rf_model_tr[[i]][[j]] = list()
rfPred[[i]][[j]]=list()
pred[[i]][[j]]=list()
roundpred[[i]][[j]]=list()
cTab[[i]][[j]]=list()
Acc[[i]][[j]]=list()
pred.to.roc[[i]][[j]]=list()
pred.rocr[[i]][[j]]=list()
perf.rocr[[i]][[j]]=list()
AUC[[i]][[j]]=list()
Var_imp[[i]][[j]]=list()

## Tune mrty
mrtysize =c(2,3,4)
m=length(mrtysize)
length(rf_model_tr[[i]][[j]]) <- m
for ( k in 1: length (mrtysize))
{  


  rf_model_tr[[i]][[j]][[k]] = list()
  rfPred[[i]][[j]][[k]]=list()
  pred[[i]][[j]][[k]]=list()
  roundpred[[i]][[j]][[k]]=list()
  cTab[[i]][[j]][[k]]=list()
  Acc[[i]][[j]][[k]]=list()
  pred.to.roc[[i]][[j]][[k]]=list()
  pred.rocr[[i]][[j]][[k]]=list()
  perf.rocr[[i]][[j]][[k]]=list()
  AUC[[i]][[j]][[k]]=list()
  Var_imp[[i]][[j]][[k]]=list()

  ## Tune maxdepth
  maxdep =c(10,20,30)
  z=length(maxdep)
  length(rf_model_tr[[i]][[j]][[k]]) <- z
  for (l in 1:length (maxdep))

  { 
    rf_model_tr[[i]][[j]][[k]][[l]] = list()
    rfPred[[i]][[j]][[k]][[l]]=list()
    pred[[i]][[j]][[k]][[l]]=list()
    roundpred[[i]][[j]][[k]][[l]]=list()
    cTab[[i]][[j]][[k]][[l]]=list()
    Acc[[i]][[j]][[k]][[l]]=list()
    pred.to.roc[[i]][[j]][[k]][[l]]=list()
    pred.rocr[[i]][[j]][[k]][[l]]=list()
    perf.rocr[[i]][[j]][[k]][[l]]=list()
    AUC[[i]][[j]][[k]][[l]]=list()
    Var_imp[[i]][[j]][[k]][[l]]=list()

    ## Tune cp
    cp =c(0,0.01,0.001)
    p=length(cp)
    length(rf_model_tr[[i]][[j]][[k]][[l]]) <- p
    for (m in 1:length (cp))

      {

        rf_model_tr[[i]][[j]][[k]][[l]][[m]]= randomForest  (as.factor(class) ~.


                                                                  , data=train,mtry=mrtysize[[k]],maxDepth = maxdep[[l]], replace=F, importance=T, do.trace=10, ntree=200,nodesize=nodesize[j],cp=cp[[m]])   
        #Accuracy
        rfPred[[i]][[j]][[k]][[l]][[m]] <- predict(rf_model_tr[[i]][[j]][[k]][[l]][[m]], train, type = "prob")
        pred[[i]][[j]][[k]][[l]][[m]] <- colnames(rfPred[[i]][[j]][[k]][[l]][[m]] )[apply(rfPred[[i]][[j]][[k]][[l]][[m]] ,1,which.max)]
        cTab[[i]][[j]][[k]][[l]][[m]]  = table(pred[[i]][[j]][[k]][[l]][[m]],train$class)
        Acc[[i]][[j]][[k]][[l]][[m]]<- sum(diag(cTab[[i]][[j]][[k]][[l]][[m]])) / sum(cTab[[i]][[j]][[k]][[l]][[m]])

        #AUC
        pred.to.roc[[i]][[j]][[k]][[l]][[m]]<-rfPred[[i]][[j]][[k]][[l]][[m]][,2]
        pred.rocr[[i]][[j]][[k]][[l]][[m]]<-prediction(pred.to.roc[[i]][[j]][[k]][[l]][[m]],as.factor(train$class))
        perf.rocr[[i]][[j]][[k]][[l]][[m]]<-performance(pred.rocr[[i]][[j]][[k]][[l]][[m]],measure="auc",x.measure="cutoff")
        AUC[[i]][[j]][[k]][[l]][[m]]<-as.numeric(perf.rocr[[i]][[j]][[k]][[l]][[m]]@y.values)

        #Variable Importance
        Var_imp[[i]][[j]][[k]][[l]][[m]]<-(importance(rf_model_tr[[i]][[j]][[k]][[l]][[m]],type=2))



        ModelInfo[w,1]<-w
        ModelInfo[w,2]<-nodesize[[j]]
        ModelInfo[w,3]<-mrtysize[[k]]
        ModelInfo[w,4]<-maxdep[[l]]
        ModelInfo[w,5]<-cp[[m]]
        ModelInfo[w,6]<-Acc[[i]][[j]][[k]][[l]][[m]]
        ModelInfo[w,7]<-AUC[[i]][[j]][[k]][[l]][[m]]

        w=w+1

      }

    }
  }
}
}

Basically ,what i am doing is that i am creating all possible model variations with one dataset based on the available tuning parameters for a random forest (nodesize,cp ect) and storing that information to the table model info as every iteration goes by. In addition i add measures like accuracy and AUC, so as to compare the different models created in the end and make a pick.

The reason i am looking for an alternative, is that the caret package offers me only to tune the mtry allthough there i do have the chance to run parRF which could solve my problem, but i prefer to incorporate something here, how would that be possible?

I have read about the foreach and doParallel packages but i dont quite get how this could be syntaxed here.

If the initial data is needed please let me know, i just thought at this point to show the part that neeeds to be parallel computed.

Thank you in advance

1
Have you tried Revolution R's distribution? It uses Intel's math libraries to take advantage of SIMD operations in the CPU, hyperthreading and multiple cores to accelerate the core commands themselves. In a quick test, I've found that a simple svd command run 7 times faster on a quad i7 and used all cores (real and virtual)Panagiotis Kanavos

1 Answers

1
votes

Hi I normally just code everything manually. In linux/mac I use parallel package and mclapply which can use memory forking. Forking processes use less memory and are faster to start up. Windows do not support forking thus I use doParallel package (other packages could do also). the foreach() function is a user friendly parallel mapper. I find myself to spend more time setting up single PC parallel computing than saving from speed-up. Still fun :)

If you work on a university, you may have access to a large cluster. The BatchJobs package is another mapper which can use many different backends, e.g. a Torque/PBS que system. I can borrow 80 nodes with 4 CPU's giving me a potential 320 times speedup (more like 150 times in practice). I learned about BatchJobs from this great introduction. I like that BatchJobs also can run single or multi-core locally, which is much easier to debug.

The code below introduces how to create a list of jobs with both foreach and BatchJobs. Each job is a set of arguments. The job arguments are fused with standard arguments and a model is trained. Some statistics is returned and all results and arguments are combined into a data.frame.

useForeach = FALSE #If FALSE, will run as batchjobs. Only faster for cluster computing.
library(randomForest)

#load a data set
url =  "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"
download.file(url,destfile="winequality-white.csv",mode="w")
wwq = read.csv(file="winequality-white.csv",header=T,sep=";")
X = wwq[,names(wwq) != "quality"]
y = wwq[,"quality"]

#2 - make jobs
pars = expand.grid(
  mtry = c(1:3),
  sampsize = floor(seq(1000,1898,length.out = 3)),
  nodesize = c(1,3)
)
jobs = lapply(1:dim(pars)[1], function(i) pars[i,])


#3 - make node function there will excute a number of jobs
test.pars = function(someJobs,useForeach=TRUE) {
  #if running cluster, global environment imported manually
  if(!useForeach) load(file="thisState.rda")
  do.call(rbind,lapply(someJobs,function(aJob){ #do jobs and bind results by rows
    print(aJob)
    merged.args = c(alist(x=X,y=y),as.list(aJob)) #merge std. and job args
    run.time = system.time({rfo = do.call(randomForest,merged.args)}) #run a job
    data.frame(accuracy=tail(rfo$rsq,1),run.time=run.time[3],mse=tail(rfo$mse,1))
  }))
}


##test function single core
jobsToTest = 1:5
out = test.pars(jobs[jobsToTest])
print(cbind(out,do.call(rbind,jobs[jobsToTest])))


#4a execute jobs with foreach package:
if(useForeach) {
  library(foreach)
  library(doParallel)
  CPUs=4 
  cl = makeCluster(CPUs)#works both for windows and linux, otherwise forking is better
  registerDoParallel(cl)
  nodes=min(CPUs,length(jobs)) #how many splits of jobList, not so important for foreach...
  job.array = suppressWarnings(split(jobs,1:nodes)) #split warns if each core cannot get same amount of jobs
  out = foreach(i=job.array,.combine=rbind,.packages="randomForest") %dopar% test.pars(i)
  stopCluster(cl) 

} else {
  library(BatchJobs)
  #4b - execute jobs with BatchJobs package (read manual how to set up on cluster)
  nodes=min(80,length(jobs))   # how many nodes to split job onto
  job.array = split(jobs,1:nodes)
  save(list=ls(),file="thisState.rda") #export this state(global environment) to every node
  #initiate run
  reg = makeRegistry(id ="myFirstBatchJob",packages="randomForest")
  batchMap(reg,fun=test.pars,someJobs = job.array,more.args=list(useForeach=FALSE))
  submitJobs(reg)
  waitForJobs(reg)
  out = loadResults(reg)

  #6- wrap up save filnalResults to user
  finalResult = cbind(do.call(rbind,jobs),do.call(rbind,out))
  save(out,file="finalResult.rda")

  removeRegistry(reg,ask="no")
}

#7- print final result
print(cbind(do.call(rbind,jobs),out))