1
votes

We use distributed erlang cluster and now I tests it in case of net splits.

To get information from all nodes of the cluster I use gen_server:multicall/4 with defined timeout. What I need is to get information from available nodes as soon as possible. So timeout is not too big (about 3000 ms). Here call example:

Timeout = 3000
Nodes = AllConfiguredNodes
gen_server:multi_call(Nodes, broker, get_score, Timeout)

I expect that this call returns result in Timeout ms. But in case of net split it does not. It waits approx. 8 seconds.

What I found that multi_call request is halted for additional 5 seconds in call erlang:monitor(process, {Name, Node}) before sending request.

I really do not care that some node do not reply or busy or not available, I can use any other but with this halting I forced to wait until Erlang VM try to establish new connection to dead/not available node.

The question is: do you know solution that can prevent this halting? Or may be another RPC that suitable for my situation.

2
Look at the updated Answer, I think it solves your problemJr0

2 Answers

0
votes

I'm not sure if I totally understand the problem you are trying to solve, but if it is to get all the answers that can be retrieved in X amount of time and ignore the rest, you might try a combination of async_call and nb_yield.

Maybe something like

somefun() ->
    SmallTimeMs = 50,
    Nodes = AllConfiguredNodes,
    Promises = [rpc:async_call(N, some_mod, some_fun, ArgList) || N <- Nodes],
    get_results([], Promises, SmallTimeMs).


get_results(Results, _Promises, _SmallTimeMs) when length(Results) > 1 ->   % Replace 1 with whatever is the minimum acceptable number of results
    lists:flatten(Results);
get_results(Results, Promises, SmallTimeMs) ->
    Rs = get_promises(Promises, SmallTimeMs)
    get_results([Results|Rs], Promises, SmallTimeMs)).


get_promise(Promises, WaitMs) ->
    [rpc:nb_yield(Key, WaitMs) || Key <- Promises].

See: http://erlang.org/doc/man/rpc.html#async_call-4 for more details.

0
votes

My solution of the problem.

I've made my own implementation of multicall that uses gen_server:call Basic idea is to call all nodes with gen_server:call() in separate process. And collect result of these calls. Collection is made by receiving messages from mailbox of calling process.

To control timeout I calculate deadline when timeout expired and then use it as reference point to calculate timeout for after in receive.

Implementation

Main function is:

multicall(Nodes, Name, Req, Timeout) ->
    Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
    Results = read_all(Timeout, Refs),
    PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
    { PosResults, calc_bad_nodes(Nodes, PosResults) }.

Idea here is to call all nodes and wait for all results within one Timeout.

Calling one node is performed from spawned process. It catches exits that used by gen_server:call in case of error.

call_node(Node, Name, Req, Timeout) ->
    Ref = make_ref(),
    Self = self(),
    spawn_link(fun() ->
                       try
                           Result = gen_server:call({Name,Node},Req,Timeout),
                           Self ! { Ref, { ok, { Node, Result } } }
                       catch
                           exit:Exit ->
                               Self ! { Ref, { error, { 'EXIT', Exit } } }
                       end
               end),
    Ref.

Bad nodes are calculated as those that are not respond within Timout

calc_bad_nodes(Nodes, PosResults) ->
    { GoodNodes, _ } = lists:unzip(PosResults),
    [ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].

Results are collected by reading mailbox with Timeout

read_all(ReadList, Timeout) ->
    Now = erlang:monotonic_time(millisecond),
    Deadline = Now + Timeout,
    read_all_impl(ReadList, Deadline, []).

Implementation reads until Deadline does not occur

read_all_impl([], _, Results) ->
    lists:reverse(Results);
read_all_impl([ W | Rest ], expired, Results) ->
    R = read(0, W),
    read_all_impl(Rest, expired, [R | Results ]);
read_all_impl([ W | Rest ] = L, Deadline, Results) ->
    Now = erlang:monotonic_time(millisecond),
    case Deadline - Now of
        Timeout when Timeout > 0 ->
            R = read(Timeout, W),
            case R of
                { ok, _ } ->
                    read_all_impl(Rest, Deadline, [ R | Results ]);
                { error, { read_timeout, _ } } ->
                    read_all_impl(Rest, expired, [ R | Results ])
            end;
        Timeout when Timeout =< 0 ->
            read_all_impl(L, expired, Results)
    end.

One single read is just receive from mailbox with Timeout.

read(Timeout, Ref) ->
    receive
        { Ref, Result } ->
            { ok, Result }
    after Timeout ->
            { error, { read_timeout, Timeout } }
    end.

Further improvements:

  • rpc module spawns separate process to avoid garbage of late answers. So It will be useful to do the same in this multicall function
  • infinity timeout may be handled in obvious way