20
votes

I have a large code and the aggregation step is the current bottleneck in terms of speed.

In my code I'd like to speed-up the data grouping step to be faster. A SNOTE (simple non trivial example) of my data looks like this:

library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
   user  system elapsed 
 60.107   3.143  63.534

This is quite fast for such large data example but in my case I am still looking for further speed-up. In my case I have multiple cores so I am almost sure there must be a way to use such computational capability.

I am open to changing my data type to a data.frame, or idata.frame objects (in theory idata.frame are supposedly faster than data.frames).

I did some research and seems the plyr package has some parallel capabilities that could be helpful but I am still struggling on how to do it for the grouping I am trying to do. In another SO post they discuss some of these ideas. I am still unsure on how much more I'd achieve with this parallelization since it uses the foreach function. In my experience the foreach function is not a good idea for millions of fast operations because the communication effort between cores ends up slowing down the parallelization effort.

2
Please be more specific about what the words "concatenate" and "aggregate" mean. The functions these brings to mind are 3: list, c, and paste. And what is the function of that code. Are we extracting columns from dataframes or working on data.tables? What are the structures of 'block.read.parent.cigar' and other input variables ..... explain this problem better! (Apparently someone else agrees. That's not my downvote.)IRTFM
@Dwin, thanks! I am not sure if I clarified enough in the Q, but the basic Q is how to speed up an aggregating operation for a large data table like the one in the example above. Also to have in mind that i can use multiple cores so there may be some smart paralelization ideas that could speed-up such operation considerably. Hope this helps, I added an exampleDnaiel
I didn't downvote. But the reason I would do is you haven't provided any information about the data. If read.index is a row index then of course grouping every row into a row by itself is going to be slow. You'll be calling paste millions of times. Did you use Rprof? Did you use verbose=TRUE? And you're using words like "too slow" without giving numbers. In fact, I've talked myself into downvoting it now. It can be reversed if you improve the question.Matt Dowle
@Dnaiel That's now a great question. +1. I'll try and take a look. Some answerers just have new question feeds I guess, so to get more attention it might be an idea to offer a bounty.Matt Dowle
@MattDowle thanks a lot, I am glad I improved such confusing Q :-) Not sure how great it is but that's the problem I am dealing with. I am learning more on how to ask better questions so it's good for me.Dnaiel

2 Answers

13
votes

Can you parallelize aggregation with data.table? Yes.

Is it worth it? NO. This is a key point that the previous answer failed to highlight.

As Matt Dowle explains in data.table and parallel computing, copies ("chunks") need to be made before being distributed when running operations in parallel. This slows things down. In some cases, when you cannot use data.table (e.g. running many linear regressions), it is worth splitting up tasks between cores. But not aggregation — at least when data.table is involved.

In short (and until proven otherwise), aggregate using data.table and stop worrying about potential speed increases using doMC. data.table is already blazing fast compared to anything else available when it comes to aggregation — even if it's not multicore!


Here are some benchmarks you can run for yourself comparing data.table internal aggregation using by with foreach and mclapply. The results are listed first.

#-----------------------------------------------

# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1)  0.007 -- data.table using `by`
# (2)  3.548 -- mclapply with rbindlist
# (3)  5.557 -- foreach with rbindlist
# (4)  5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply

# ----------------------------------------------

library(data.table)

## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")

# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617

## using `lapply`
round(rowMeans(replicate(3, system.time({
    results <- lapply(unique(dt[["a"]]), function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
    })
    rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000

# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
    results <- mclapply(unique(dt[["a"]]),
    function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    }, mc.cores=4)
    rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000


# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4

## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000

## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    results <-
      foreach(x=unique(dt[["a"]])) %dopar%
        dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000

registerDoSEQ()
getDoParWorkers()
# [1] 1
8
votes

If you have multiple cores available to you, why not leverage the fact that you can quickly filter & group rows in a data.table using its key:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

Note that if the number of unique groups (ie length(unique(a)) ) is relatively small, it will be faster to drop the .combine argument, get the results back in a list, then call rbindlist on the results. In my testing on two cores & 8GB RAM, the threshold was at about 9,000 unique values. Here is what I used to benchmark:

# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
  foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3) 
# [1]  1.243 elapsed for N ==  1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
    results <- 
      foreach(x=unique(dt[["a"]])) %dopar% 
         dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
# ------- #
}))), 3)
# [1]  1.117 elapsed for N ==  1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")