2
votes

Try to use OTP-style in project and got one OTP-interface question. What solution is more popular/beautiful?

What I have:

  1. web-server with mochiweb
  2. one process, what spawns many (1000-2000) children. Children contain state (netflow-speed). Process proxies messages to children and create new children, if need.

In mochiweb I have one page with speed of all actors, how whey made:

    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
%% write http header, chunked
%% and while AbonentsCount != 0,  receive speed and write http

This is not-opt style, how i can understand. Solutions:

  1. In API synchronous function get all requests with speed and return list with all speeds. But I want write it to client at once.
  2. One argument of API-function is callback:

    nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
    
  3. Return iterator: One of results of get_all_speeds will be function with receive-block. Every call of it will return {ok, Speed}, at the end it return {end}.

get_all_speeds() ->
    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
    {ok, fun() -> 
        create_receive_fun(AbonentsCount)
    end}.

create_receive_fun(0)->
    {end};

create_receive_fun(Count)->
        receive
            {abonent_speed, Speed} ->
                Speed
        end,
        {ok, Speed, create_receive_fun(Count-1)}.

1
What is the actual question? Do you have trouble implementing any of the options, if so you should ask something related to that. Otherwise, this will be primarily opinion based depending completely on your use case.Adam Lindberg
I agree with Adam and see this more like a design problem, but there is not enough information for any advice. Why there are 1000-2000 children containing state. Why there is process returning count and issuing call instead of returning those children and let the caller decide what to do? What will be read/write ratio? What are non-functional requirements like is it more important to have low latencies or throughput? Is it main functionality or how big fraction is it of the rest of system? And so on. It doesn't make so much sense to me with without additional information.Hynek -Pichi- Vychodil
Reason to write this question is simple: erlang give easy way to write actor-based programs, OTP give standartization. First i wrote program without OTP and it was complicated to understand program-logic. After adding OTP it become flat and simple. Here i got one more complex behavior, whan sync/async calls. And i ask the way, what other erlang developers are choice if will meet a similar problem. Or this problem became from bad-design and answer is - do it simple, without 1000 (or more) actorskolko
Try to rephrase: If my call tied on many actors, so i must handle many cases: 1. one, or more actors are die when my call receiving messages 2. call initiator die 3. make 2-3-more calls in one time 4. other... Is where any OTP-behavior, good-coding standart or recomindations of how i can do it? That ayes of other erlang-coders who look into my code are not start bliding :) If not, true, i can make it by many ways, so... i can close this questionkolko

1 Answers

1
votes

Spawn your 'children' from a supervisor:

-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> {ok, {{simple_one_for_one}, [{ch, {ch, start_link, []}, transient, 1000, worker, [ch]}]}}.
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).

Start them with ch_sup:start_child/1 (Data is whatever).

Implement your children as a gen_server:

-module(ch).
-behaviour(gen_server).
-record(?MODULE, {speed}).

...

get_speed(Pid, Timeout) ->
    try
        gen_server:call(Pid, get, Timeout)
    catch
        exit:{timeout, _} -> timeout;
        exit:{noproc, _} -> died
    end
.

...

handle_call(get, _From, St) -> {reply, {ok, St#?MODULE.speed}, St} end.

You can now use the supervisor to get the list of running children and query them, though you have to accept the possibility of a child dying between getting the list of children and calling them, and obviously a child could for some reason be alive but not respond, or respond with an error, etc.

The get_speed/2 function above returns either {ok, Speed} or died or timeout. It remains for you to filter appropriately according to your applications needs; easy with a list comprehension, here's a few.

Just the speeds:

[Speed || {ok, Speed} <- [ch:get_speed(Pid, 1000) || Pid <-
    [Pid || {undefined, Pid, worker, [ch]} <-
        supervisor:which_children(ch_sup)
        ]
    ]].

Pid and speed tuples:

[{Pid, Speed} || {Pid, {ok, Speed}} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

All results, including timeouts and 'died' results for children that died before you got to them:

[{Pid, Any} || {Pid, Any} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

In most situations you almost certainly don't want anything other than the speeds, because what are you going to do about deaths and timeouts? You want those that die to be respawned by the supervisor, so the problem is more or less fixed by the time you know about it, and timeouts, as with any fault, are a separate problem, to be dealt with in whatever way you see fit... There's no need to mix the fault fixing logic with the data retrieval logic though.

Now, the problem with all these, which I think you were getting at in your post, but I'm not quite sure, is that the timeout of 1000 is for each call, and each call is synchronous one after the other, so for 1000 children with a 1 second timeout, it could take 1000 seconds to produce no results. Making time timeout 1ms might be the answer, but to do it properly is a bit more complicated:

get_speeds() ->
    ReceiverPid = self(),
    Ref = make_ref(),
    Pids = [Pid || {undefined, Pid, worker, [ch]} <-
            supervisor:which_children(ch_sup)],
    lists:foreach(
        fun(Pid) -> spawn(
            fun() -> ReceiverPid ! {Ref, ch:get_speed(Pid, 1000)} end
            ) end,
        Pids),
    receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.

receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
    [];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
    Time = os_milliseconds(),
    TimeLeft = Timeout - Time + StartTime,
    receive
        {Ref, acc_timeout} ->
            [];
        {Ref, {ok, Speed}} ->
            [Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
        {Ref, _} ->
            receive_speeds(Ref, Remaining-1, StartTime, Timeout)
    after TimeLeft ->
        []
    end
.

os_milliseconds() ->
    {OsMegSecs, OsSecs, OsMilSecs} = os:timestamp(),
    round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.

Here each call is spawned in a different process and the replies collected, until the 'master timeout' or they have all been received.

Code has largely been cut-n-pasted from various works I have lying round, and edited manually and by search replace, to anonymise it and remove surplus, so it's probably mostly compilable quality, but I don't promise I didn't break anything.