9
votes

First my question:

  • is it possible to prevent Julia from copying variables each time in a parallel for loop ?
  • if not, how to implement a parallel reduce operations in Julia ?

Now the details:

I have this program:

data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
  fct(data)::DataFrame
end

It works nice functionality wise, but each parallel call to fct(data) on another worker copies the whole data frame, making everything painfully slow.

Ideally, I would like to load the data once, and always use each on each worker the pre-loaded data. I came up with this code to do so:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
  end
  # How to vcat all the filtered_data_temp ?
end

But now I have another problem: I cannot figure out how to vcat() all the filtered_data_temp onto a variable in the worker with myid()==1.

I would very much appreciate any insight.

Note: I am aware of Operating in parallel on a large constant datastructure in Julia. Yet, I don't believe it applies to my problem because all my filter_functions do operate on the array as a whole.

2

2 Answers

10
votes

You might want to look into/load your data into Distributed Arrays

EDIT: Probably something like this:

data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ] 
for fct in filter_functions
  dfiltered_data = fct(dfiltered_data)::DataFrame
end

You can also check the unit tests for more examples

4
votes

After all, I found over there the solution to my question: Julia: How to copy data to another processor in Julia.

Especially, it introduces the following primitive in order to retrieve a variable from another process:

getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))

Below is how I am using it:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
  local_results = ... # some type
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
    local_results = vcat(local_results, filtered_data_temp)
  end
  # How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
  worker_local_results = getfrom(wid, :local_results)
  all_results = vcat(all_results,worker_local_results)
end