4
votes

A module calls a gen_server to handle a stream, which uses record as State.

handle_call handles stream using a function from State, which separates completed piece of data and tail,

Now next time, the tail should be fed first but with an updated State, before the module sends in more data.

handle_call({stream, Data}, _From, State = #mystate{myfun=Fun}) ->
    case Fun(Data) of
       {completed piece,tail} -> 
           dosomethingwithpieace,
           NewState = State##mystate{myfun=resetfun()};
           % How do i call this again to feed Tail first with new state?
       {stillstreaming, Currentstate} ->
           NewState = State##mystate{myfun=CurrentState};

I cannot call gen_server:call(self(),{stream, Tail}) because State needs to be updated first. And I cannot reply with a new State, because module will send in more data and tail will disappear.

Is there a way to call it again with updated State without replying with tail and feeding tail back from module??

Update,Code:

% caller module
case gen_tcp:recv(Socket, 0) of % cannot set Length as it will block untill it is done reading Length number of bytes 
    {ok, Data} ->
        Response = gen_server:call(Pid, {handle_init,Socket,Data}),
        case Response of
            {ok, continue} ->
                pre_loop(Socket, Pid);
            {ok, logged_in} ->
                {UserId, UserName} = get_user_data(), % ignore it for now.
                receiver_loop(UserId, UserName, Socket, Pid);
            {stop, Reason} ->
                io:format("Error at pre_loop: ~p~n", [Reason]);
            _ ->
                io:format("Unknown response from call in pre-loop: ~p~n", [Response])
        end;
    {error, closed} -> % done here as no data was stored in mnesia yet.
        gen_server:stop(Pid),
        io:format("Client died in pre_loop~n")
end.

and gen_server module:

% gen_server module
handle_call({handle_init, _Socket, Data}, _From, State = #server_state{data_fun =  {incomplete, Fun}}) ->
    case catch Fun(Data) of
        {incomplete, F} ->
            NewState = State#server_state{data_fun = {incomplete, F}},
            {reply, {ok, continue}, NewState};
        {with_tail, Term, Tail} ->
            % handle Term login/register only
            case handle_input_init(Term, Tail) of
                {incomplete, Fn, logged_in} ->
                    NewState = State#server_state{data_fun = {incomplete, Fn}},
                    {reply, {ok, logged_in}, NewState};
                {incomplete, Fn} ->
                    NewState = State#server_state{data_fun = {incomplete, Fn}},
                    {reply, {ok, continue}, NewState};
                {stop, malformed_data} ->
                    {reply, {stop, malformed_data}, State}
            end;
        _ ->
            {reply, {stop, malformed_data}, State}
    end;

handle_call(_Message, _From, State = #server_state{}) ->
{reply, {stop , unknown_call}, State}.

handle_input_init(Term, Tail) ->
case handle_term_init(Term) of
    {ok, login_passed} ->
        io:format("send user a login pass msg"),
        handle_tail_init(Tail, logged_in);
    {error, login_failed} ->
        io:format("send user a login failed error~n"),
        handle_tail_init(Tail);
    {ok, registration_passed} ->
        io:format("send user a registeration passed msg"),
        handle_tail_init(Tail);
    {error, registration_failed} ->
        io:format("send user a registeration failed error"),
        handle_tail_init(Tail);
    {error, invalidreq} ->
        io:format("send user an invalid requst error~n"),
        handle_tail_init(Tail)
end.

handle_tail_init(Tail) ->
case catch jsx:decode(Tail, [stream, return_tail, return_maps]) of
    {incomplete, F} ->
        {incomplete, F};
    {with_tail, Term, Tail2} ->
        handle_input_init(Term, Tail2);
    _ ->
        {stop, malformed_data}
end.

handle_tail_init(Tail, logged_in) -> % because it was logged in already, any further requests should be ignored
case catch jsx:decode(Tail, [stream, return_tail, return_maps]) of
    {incomplete, F} ->
        {incomplete, F, logged_in};
    {with_tail, _Term, Tail2} ->
        io:format("send user an invalid requst error~n"),
        handle_tail_init(Tail2, logged_in);
    _ ->
        {stop, malformed_data}
end.

handle_term_init(Term) ->
case Term of
    #{<<"Login">> := [UserName,Password]} ->
        login_user(UserName,Password);
    #{<<"Register">> := [UserName,Password]} ->
        register_user(UserName,Password);
    _ ->
        {error, invalidreq}
end.

It is working as expected but this is my very first Erlang code ever and i am positive that it can be simplified to a single recursive handle_call, maintaining OTP style, the reason I chose Erlang.

1
Never use pronouns when writing anything--who knows what it and this refer to. Well, you know--but you are the only one. You can save anything you want in State: expand it to a 1,000 element tuple if needed where one element is your record, then you can use the other 999 slots to store pieces of anything you want--including your disappearing tail. Also, I don't think your question has anything to do with streams, so create a simple example that demonstrates your problem without using streams, e.g. handle_call({go, Data}, _From, Func) -> case Func(Data) of {X, Y}7stud
Its not my tail xD and yes i know i can but this does not solve the problem, i can use thousands of tuples or write multiple function to solve it but it will create many more problems i.e. what if the tail contains a complete piece of data or is a complete piece itself? i will have to handle completed pieces everywhere in addition to writing complex code.asim

1 Answers

3
votes

I cannot call gen_server:call(self(),{stream, Tail}) because State needs to be updated first.

I can't really understand what you are trying to say, but if you mean:

I cannot recursively call gen_server:call(self(),{stream, Tail}), i.e. I cannot write code in handle:call() that recursively calls handle_call().

then you can certainly send all the data inside handle:call() to another function that recursively calls itself:

handle_call(...) ->

    ...
    NewState = ....
    Result = ...
    {FinalResult, FinalState} = helper_func(Result, NewState, Tail)
    {reply, FinalResult, FinalState}

helper_func(Result, State, []) ->
    {Result, State};
helper_func(Result, State, [Piece|Tail]) ->
    ...
    NewState = ...
    NewResult = ...
    helper_func(NewResult, NewState, Tail).  %% Recursive call here