6
votes

I am trying to port some of my R code to Julia; Basically I have rewritten the following R code in Julia:

library(parallel)

eps_1<-rnorm(1000000)
eps_2<-rnorm(1000000)

large_matrix<-ifelse(cbind(eps_1,eps_2)>0,1,0)
matrix_to_compare = expand.grid(c(0,1),c(0,1))
indices<-seq(1,1000000,4)
large_matrix<-lapply(indices,function(i)(large_matrix[i:(i+3),]))

function_compare<-function(x){
  which((rowSums(x==matrix_to_compare)==2) %in% TRUE)
}

> system.time(lapply(large_matrix,function_compare))
   user  system elapsed 
 38.812   0.024  38.828 
> system.time(mclapply(large_matrix,function_compare,mc.cores=11))
   user  system elapsed 
 63.128   1.648   6.108 

As one can notice I am getting significant speed-up when going from one core to 11. Now I am trying to do the same in Julia:

#Define cluster:

addprocs(11);

using Distributions;
@everywhere using Iterators;
d = Normal();

eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);


#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000)

#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];

#Define the function to apply:
@everywhere function function_split(x)
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
    matrix_to_compare = matrix_to_compare.>0;
    find(sum(x.==matrix_to_compare,2).==2)
end

@time map(function_split,large_matrix )
@time pmap(function_split,large_matrix )

   5.167820 seconds (22.00 M allocations: 2.899 GB, 12.83% gc time)
   18.569198 seconds (40.34 M allocations: 2.082 GB, 5.71% gc time)

As one can notice I am not getting any speed up with pmap. Maybe somebody can suggest alternatives.

1
large_matrix is 250000-element Array{Any,1}: Might this be the problem?daycaster
I do not really know I am very new to JuliaVitalijs
On Julia 0.4.6 I get the following results with addprocs(3): 4.173674 seconds (22.97 M allocations: 2.943 GB, 14.57% gc time) and 0.795733 seconds (292.07 k allocations: 12.377 MB, 0.83% gc time). Also the type of large_matrix is Array{BitArray{2},1}.tim
This is very strange on my macbook pro with addprocs(3) : 5.860692 seconds (22.90 M allocations: 2.938 GB, 13.20% gc time); @time pmap(function_split,large_matrix ) 27.411076 seconds (40.60 M allocations: 2.094 GB, 3.17% gc time)Vitalijs
Sorry, I just copied your code before your edit and didn't check the return value of pmap. So it was only fast because it was collecting exceptions. Using the new version I see the same behavior as you. This is because calling your function on a 4x2 arrays is very fast. pmap is only useful if each function call takes a considerable amount of time. Depending on what you want to do with the resulting array, you may be interested in @parallel.tim

1 Answers

2
votes

I think that some of the problem here is that @parallel and @pmap don't always handle moving data to and from the workers very well. Thus, they tend to work best in situations where what you are executing doesn't require very much data movement at all. I also suspect that there are probably things that could be done to improve their performance, but I'm not certain on the details.

For situations in which you do need more data moving around, it may be best to stick with options that directly call functions on workers, with those functions then accessing objects within the memory space of those workers. I give one example below, which speeds up your function using multiple workers. It uses perhaps the simplest option, which is @everywhere, but @spawn, remotecall() etc. are also worth considering, depending on your situation.

addprocs(11);

using Distributions;
@everywhere using Iterators;
d = Normal();

eps_1 = rand(d,1000000);
eps_2 = rand(d,1000000);

#Create a large matrix:
large_matrix = hcat(eps_1,eps_2).>=0;
indices = collect(1:4:1000000);

#Split large matrix:
large_matrix = [large_matrix[i:(i+3),:] for i in indices];

large_matrix = convert(Array{BitArray}, large_matrix);

function sendto(p::Int; args...)
    for (nm, val) in args
        @spawnat(p, eval(Main, Expr(:(=), nm, val)))
    end
end

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))

@everywhere function function_split(x::BitArray)
    matrix_to_compare = transpose(reinterpret(Int,collect(product([0,1],[0,1])),(2,4)));
    matrix_to_compare = matrix_to_compare.>0;
    find(sum(x.==matrix_to_compare,2).==2)
end


function distribute_data(X::Array, WorkerName::Symbol)
    size_per_worker = floor(Int,size(X,1) / nworkers())
    StartIdx = 1
    EndIdx = size_per_worker
    for (idx, pid) in enumerate(workers())
        if idx == nworkers()
            EndIdx = size(X,1)
        end
        @spawnat(pid, eval(Main, Expr(:(=), WorkerName, X[StartIdx:EndIdx])))
        StartIdx = EndIdx + 1
        EndIdx = EndIdx + size_per_worker - 1
    end
end

distribute_data(large_matrix, :large_matrix)


function parallel_split()
    @everywhere begin
        if myid() != 1
            result = map(function_split,large_matrix );
        end
    end
    results = cell(nworkers())
    for (idx, pid) in enumerate(workers())
        results[idx] = getfrom(pid, :result)
    end
    vcat(results...)
end

## results given after running once to compile
@time a = map(function_split,large_matrix); ## 6.499737 seconds (22.00 M allocations: 2.899 GB, 13.99% gc time)
@time b = parallel_split();  ## 1.097586 seconds (1.50 M allocations: 64.508 MB, 3.28% gc time)

julia> a == b
true

Note: even with this, the speedup is not perfect from the multiple processes. But, this is to be expected, since there is still a moderate amount of data to be returned as a result of your function, and that data's got to be moved, taking time.

P.S. See this post (Julia: How to copy data to another processor in Julia) or this package (https://github.com/ChrisRackauckas/ParallelDataTransfer.jl) for more on the sendto and getfrom functions I used here.