My solution of the problem.
I've made my own implementation of multicall that uses gen_server:call
Basic idea is to call all nodes with gen_server:call() in separate process. And collect result of these calls. Collection is made by receiving messages from mailbox of calling process.
To control timeout I calculate deadline when timeout expired and then use it as reference point to calculate timeout for after
in receive
.
Implementation
Main function is:
multicall(Nodes, Name, Req, Timeout) ->
Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
Results = read_all(Timeout, Refs),
PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
{ PosResults, calc_bad_nodes(Nodes, PosResults) }.
Idea here is to call all nodes and wait for all results within one Timeout.
Calling one node is performed from spawned process. It catches exits that used by gen_server:call
in case of error.
call_node(Node, Name, Req, Timeout) ->
Ref = make_ref(),
Self = self(),
spawn_link(fun() ->
try
Result = gen_server:call({Name,Node},Req,Timeout),
Self ! { Ref, { ok, { Node, Result } } }
catch
exit:Exit ->
Self ! { Ref, { error, { 'EXIT', Exit } } }
end
end),
Ref.
Bad nodes are calculated as those that are not respond within Timout
calc_bad_nodes(Nodes, PosResults) ->
{ GoodNodes, _ } = lists:unzip(PosResults),
[ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].
Results are collected by reading mailbox with Timeout
read_all(ReadList, Timeout) ->
Now = erlang:monotonic_time(millisecond),
Deadline = Now + Timeout,
read_all_impl(ReadList, Deadline, []).
Implementation reads until Deadline does not occur
read_all_impl([], _, Results) ->
lists:reverse(Results);
read_all_impl([ W | Rest ], expired, Results) ->
R = read(0, W),
read_all_impl(Rest, expired, [R | Results ]);
read_all_impl([ W | Rest ] = L, Deadline, Results) ->
Now = erlang:monotonic_time(millisecond),
case Deadline - Now of
Timeout when Timeout > 0 ->
R = read(Timeout, W),
case R of
{ ok, _ } ->
read_all_impl(Rest, Deadline, [ R | Results ]);
{ error, { read_timeout, _ } } ->
read_all_impl(Rest, expired, [ R | Results ])
end;
Timeout when Timeout =< 0 ->
read_all_impl(L, expired, Results)
end.
One single read is just receive from mailbox with Timeout.
read(Timeout, Ref) ->
receive
{ Ref, Result } ->
{ ok, Result }
after Timeout ->
{ error, { read_timeout, Timeout } }
end.
Further improvements:
- rpc module spawns separate process to avoid garbage of late answers. So It will be useful to do the same in this multicall function
infinity
timeout may be handled in obvious way