1
votes

I have a simple application target_interceptor that upon receiving register and unregister messages, starts or terminates workers under simple_one_for_one rpc_server_supervisor.

The code of supervisor rpc_server_supervisor:

init([]) ->
    MaxRestart = 5,
    MaxTime = 3600,
    {ok, {{simple_one_for_one, MaxRestart, MaxTime},
            [{rbmq_rpc_server, 
                {rbmq_rpc_server, start_link, []},
                temporary,
                10000,
                worker,
                [rbmq_rpc_server]}]}}.

register message of target_interceptor:

handle_cast({register, Args}, S = #state{channel = Channel, supervisor=Sup, refs=R, qs_link = QSLinks}) ->
    {Connection, QueueName, Node} = Args,   
    {Ok, Pid} = supervisor:start_child(Sup, [Connection, QueueName, Node]),
    Ref = erlang:monitor(process, Pid),
{noreply, S#state{refs=gb_sets:add(Ref,R), qs_link=orddict:append(binary_to_list(QueueName),Pid,QSLinks)}};

unregister message of target_interceptor:

handle_cast({unregister,{QueueName}}, S = #state{supervisor = Sup, qs_link = QSLinks}) ->
    Pid = orddict:fetch(QueueName,QSLinks) 
    case supervisor:terminate_child(Sup,Pid) of
         ok -> Success = true;
         Error -> io:format("Error ~p~n",[Error]),
                  Success = false
    end,
{noreply, S#state{qs_link=orddict:erase(QueueName,QSLinks)}}

my version of Erlang is : R15B01

The first problem is when handling register action the tuple {OK, Pid} = {error, <0.57.0>}; even though it indicates that something went wrong, the gen_server rbmq_rpc_server on Pid 57 is working correctly and responding to messages. Why is the return value of start_child function error? What went wrong?

The second problem is when handling unregister action, where supervisor:terminate_child(Sup,Pid) returns {error,simple_one_for_one) even though I reference Pid not ChildID. Why is it behaving like this and how could I dynamically and individually terminate children of my supervisor?

EDIT: Both target_interceptor and rpc_server_supervisor are supervised by rbmq_sup_sup supervisor :

init({Nodeid, Node}) ->
    MaxRestart = 1,
    MaxTime = 3600,
    {ok, {{rest_for_one, MaxRestart, MaxTime},
            [{server,
                {target_interceptor, start_link, [target, self(), {Nodeid, Node}]},
                permanent,
                5000,
                worker,
                [target_interceptor]}]}}.

EDIT: rpc_server_supervisor is called in target_interceptor init() function (Sup here is rbmq_sup_sup supervisor):

handle_info({start_worker_supervisor, Sup}, S = #state{}) ->
    {ok, Pid} = supervisor:start_child(Sup, ?SPEC),
    link(Pid),
    {noreply, S#state{sup=Pid}};

-define(SPEC,
        {rpc_server_sup,
            {rpc_server_sup, start_link, []},
            temporary,
            10000,
            supervisor,
            [rpc_server_sup]}).
1

1 Answers

1
votes

I will add to this as I get time.

A first point is that {Ok,Pid} matches any tuple even {error,Error} so calling the variable Ok might not be the best choice.

A quick question: in handle_cast({register,Args}, ... you do binary_to_list(QueueName) for the key but in handle_cast({unregister,{QueueName}}, ... you just use QueueName for the key. Why? Why do you keep a list of pids for each QueueName as unregister seems to clear them all? This will also mean that when you do Pid = orddict:fetch(QueueName,QSLinks) Pid will be a list of pids not one.

In rbmq_sup_sup you only start target_interceptor.

Are the rbmq_rpc_server processes registered?

EDIT:

I think the reason for error when doing unregister is that you have saved the pid with orddict:append/3 which saves the value in a list, even the first time, while you are getting the value with orddict:fetch/2 which returns the whole list in this case. So Pid is a list. You then try kill the child with Pid which is a list of the pid not the pid which is what supervisor:terminate_child/2 is complaining about.

If you only have one pid per QueueName then you should do orddict:store(binary_to_list(QueueName), Pid, QSLinks) instead. Then orddict:fetch/2 will return the pid.

P.S. The idea behind orddict:append/3 is that you can add more values to the same key and it keeps a list of all the values.