0
votes

I am starting trying to use Julia for parallel processing.

I am using the @spawn macro in this example, but had the same error using the remotecall_fetch function.

Following is the code:

function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1}, threads::Int16=4)    
    # initialize workers
    addprocs(threads)

    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
threads = Int16(2)
parallel_count_proteins(flist, threads)

The error happens when I try to fetch the results using fetch():

ERROR: LoadError: On worker 3

...and here is the stacktrace:

Stacktrace:
 [1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:379
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [3] #remotecall_fetch#152 at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [4] remotecall_fetch at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [5] call_on_owner at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [6] fetch(::Future) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:511
 [7] parallel_count_proteins(::Array{String,1}, ::Int16) at /Users/salvocos/Google_Drive/julia_programming/mcl_graph_to_label.jl:150
 [8] top-level scope at none:0
 [9] include at ./boot.jl:326 [inlined]
 [10] include_relative(::Module, ::String) at ./loading.jl:1038
 [11] include(::Module, ::String) at ./sysimg.jl:29
 [12] exec_options(::Base.JLOptions) at ./client.jl:267
 [13] _start() at ./client.jl:436

I know in need to make all the workers aware of the exstance of the function count_proteins but I am quite not sure on how to do it.

2

2 Answers

1
votes

As you said, you need to make count_proteins available to all the worker processes.

You can use @everywhere macro before the function definitions to make them available to all the workers. @everywhere executes a given expression on all workers.

Another way would be to put the functions that should be available to workers inside another .jl file and @everywhere include("my_helper_functions.jl"), or put your function definitions inside a begin...end block and put an @everywhere right before begin and run the block. You need to do this after the creation of worker processes. Putting such functions inside a module/package and running @everywhere using MyModule should also work.

For your code the solution would be

# addprocs here before @everywhere definitions
addprocs(2)

@everywhere function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})
    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
parallel_count_proteins(flist)

As a side note, If I understand what you are trying to do correctly, you can simply use pmap here instead, which will send the tasks one by one to processes, effectively balancing load.

You might find it useful to read the manual entry regarding code and data availability in parallel computing and also the Parallel Computing section overall. For data availability part there is also a package called ParallelDataTransfer.jl which makes moving data between processes a lot easier if you ever need it.

0
votes

As @hckr nicely explains above, the workers should be deployed (using addprocs(threads)) before using the @everywhere macro.

@everywhere can be called and used in different ways and in different parts of the program. In my case I am loading the function that I want to run in parallel from a module.

To use this function in parallel from the main I am using @everywhere include("myModule.jl").

Following are the code for MyModule:

module MyModule    
using Distributed
using Printf: @printf
using Base

"""Count sequences in the input FASTA"""
function count_proteins(fpath::String)::Int
    cnt::Int = 0
    #@show fpath
    if !isfile(fpath)
        write(Base.stderr, "\nInput FASTA not found!")
    else
        open(fpath, "r") do ifd
            for ln in eachline(ifd)
                if ln[1] == '>'
                    #println(ln)
                    cnt += 1
                end
            end
        end
    end
    # return the count
    @printf("%s\t%d\n", fpath, cnt)
    cnt
end

"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})

    # spawn the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        # @show r
        s = fetch(r)
    end    
end

And following is the main.jl using the function parallel_count_proteins from MyModule.

### main.jl ###
using Base
using Distributed
using Printf: @printf

# add path to the modules directory
push!(LOAD_PATH, dirname(@__FILE__)) # MyModule is in the same directory as main.jl

#### MAIN START ####
# deploy the workers
addprocs(4)
# load modules with multi-core functions
@everywhere include(joinpath(dirname(@__FILE__), "MyModule.jl"))

# paths with 4 input files (all in same dir as main.jl)
flist = ["tv1", "tv2", "tv3", "tv4"]

# count proteins
MyModule.parallel_count_proteins(flist)