0
votes

I am creating a module with Erlang and I have three options, which are add, edit and delete.

I can see in the logs that add function is being called in the init method, but I couldn't find anything related to delete messages. I guess it is because "terminate method" is not being called, but I am not sure if my function is correct or if I am calling edit and delete functions in the right place.

this is my code:

-module(mod_msgschedule).

-behaviour(gen_server).
-behaviour(gen_mod).

-include("ejabberd.hrl").
-include("jlib.hrl").

%% gen_mod handlers
-export([start/2,stop/1]).

%% gen_server handlers
-export([init/1,handle_info/2, handle_call/3, handle_cast/2, terminate/2, code_change/3]).

%% Hook handlers
-export([
    remove_delayed_message/2,
    add_delayed_message/7,
    edit_delayed_message/7,
    search_delayed_messages/2,
    check_packet/1,
    process_sm_iq/3]).

-export([start_link/2]).

-define(INTERVAL, timer:minutes(5)).
-define(PROCNAME, ?MODULE).
-define(NS_DELAYMSG, "delayed-msg").
-record(state,{host :: binary()}).

-record(delayed_msg, {id,from,to,server,scheduledTimestamp,packet,relativeId}).

%%--------------------------------------------------------------------
%% gen_mod callbacks
%%--------------------------------------------------------------------
start(VHost, Opts) ->
    ejabberd_loglevel:set_custom(?MODULE, 5),
    ?DEBUG("Start Module", []),
    Proc = gen_mod:get_module_proc(VHost,?PROCNAME),
    ChildSpec = {Proc, {?MODULE, start_link, [VHost,Opts]},
                 transient, 1000, worker, [?MODULE]},
    supervisor:start_child(ejabberd_sup, ChildSpec).

stop(VHost) ->
    Proc = gen_mod:get_module_proc(VHost,?PROCNAME),
    supervisor:terminate_child(ejabberd_sup,Proc),
    supervisor:delete_child(ejabberd_sup,Proc).


start_link(VHost, Opts) ->
    Proc = gen_mod:get_module_proc(VHost, ?PROCNAME),
    gen_server:start_link({local, Proc}, ?MODULE, [VHost, Opts],[]).

init([VHost, Opts]) ->
    ?DEBUG("Start Timer", []),
    process_flag(trap_exit, true),
    ejabberd_hooks:add(filter_local_packet, VHost, ?MODULE, check_packet, 10),

    timer:send_interval(?INTERVAL, self(), tick),

    IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
    gen_iq_handler:add_iq_handler(ejabberd_sm, VHost, ?NS_DELAYMSG,?MODULE, process_sm_iq, IQDisc),
    %%gen_iq_handler:add_iq_handler(ejabberd_sm, VHost, ?NS_VCARD,
    %%                              ?MODULE,process_sm_iq, IQDisc),
    %%gen_iq_handler:add_iq_handler(ejabberd_local, VHost, ?NS_VCARD,
    %%                              ?MODULE,process_local_iq, IQDisc),

    %%DirectoryHost = gen_mod:get_opt_host(VHost, Opts, "vjud.@HOST@"),
    %%Search = gen_mod:get_opt(search, Opts, true),

    %%case Search of
    %%    true ->
    %%        ejabberd_router:register_route(DirectoryHost);
    %%    _ ->
    %%        ok
    %%end,
    {ok,#state{host=VHost}}.

terminate(_Reason, State) ->
    VHost = State#state.host,
    %%case State#state.search of
    %%    true ->
    %%        ejabberd_router:unregister_route(State#state.directory_host);
    %%    _ ->
    %%        ok
    %%end,
    ejabberd_hooks:delete(filter_local_packet, VHost,?MODULE, check_packet, 10),
    gen_iq_handler:remove_iq_handler(ejabberd_local, VHost, ?NS_DELAYMSG).
    %%gen_iq_handler:remove_iq_handler(ejabberd_local, VHost, ?NS_VCARD),
    %%gen_iq_handler:remove_iq_handler(ejabberd_sm, VHost, ?NS_VCARD),
    %%ejabberd_hooks:delete(host_config_update, VHost, ?MODULE, config_change, 50),
    %%ejabberd_hooks:delete(disco_local_features, VHost, ?MODULE, get_local_features, 50).

handle_call(get_state, _From, State) ->
    {reply, {ok, State}, State};
handle_call(stop,_From,State) ->
    {stop, normal, ok, State};
handle_call(_Request, _From,State) ->
    {reply, bad_request, State}.

%% this function is called whenever gen_server receives a 'tick' message
handle_info(tick, State) ->
    State2 = send_pending_delayed_messages(State),
    {noreply, State2};

handle_info(_Info, State) ->
    {noreply, State}.

handle_cast(_Request, State) ->
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% this function is called by handle_info/2 when tick message is received
send_pending_delayed_messages(State) ->
    LServer = jlib:nameprep(State#state.host),
    Now = erlang:now(),
    CurrentTimestamp = now_to_microseconds(Now),
    ?DEBUG("Timer Triggered!! ~p", [CurrentTimestamp]),
    case search_delayed_messages(LServer,CurrentTimestamp) of
        {ok, DelayedMessages} ->
            lists:foreach(fun(DelayedMessage) ->
                  route_scheduled_message(LServer, DelayedMessage),
                  remove_delayed_message(LServer, DelayedMessage)
              end, DelayedMessages);
        {error, Reason} ->
            ?DEBUG("Select command: ~p", [{error, Reason}])
    end,
    State.

route_scheduled_message(Server, #delayed_msg{from=From, to=To, packet=Packet} = DelayedMessage) ->
    NewPacket = resend_scheduled_message_packet(Server,DelayedMessage),
    ejabberd_router:route(From, To, NewPacket).

resend_scheduled_message_packet(Server,
        #delayed_msg{scheduledTimestamp=TimeStamp, packet = Packet}) ->
    add_timestamp(TimeStamp, Server, Packet).

add_timestamp(undefined, _Server, Packet) ->
    Packet;
add_timestamp({_,_,Micro} = TimeStamp, Server, Packet) ->
    {D,{H,M,S}} = calendar:now_to_universal_time(TimeStamp),
    Time = {D,{H,M,S, Micro}},
    TimeStampXML = timestamp_xml(Server, Time),
    xml:append_subtags(Packet, [TimeStampXML]).

timestamp_xml(Server, Time) ->
    FromJID = jlib:make_jid(<<>>, Server, <<>>),
    jlib:timestamp_to_xml(Time, utc, FromJID, <<"Offline Storage">>).   

%%--------------------------------------------------------------------
%% Hook handler
%%--------------------------------------------------------------------

check_packet({From, To, XML} = Packet) ->
    #jid{luser = LUser, lserver = LServer} = From,
    case XML#xmlel.name of
        <<"message">> ->
            Type = xml:get_tag_attr_s(list_to_binary("type"), XML),
            case Type of
                <<"chat">> ->
                    DeltaTimeStampS = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("scheduled_time")}, cdata])),
                    case DeltaTimeStampS of
                        "" ->
                            Packet;
                        _ ->
                            RelativeId = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_id")}, cdata])),
                            {DeltaTimeStampI, _Rest} = string:to_integer(DeltaTimeStampS),
                            case _Rest of
                                [] ->
                                    Action = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_action")}, cdata])),
                                    ScheduledTimestamp = from_now_to_microseconds(erlang:now(),DeltaTimeStampI),
                                    NewChildren = lists:delete(lists:keyfind(<<"scheduled_time">>, 2, XML#xmlel.children),XML#xmlel.children),
                                    NewXML = XML#xmlel{ children = NewChildren },
                                    case Action of
                                        "edit" ->                                           
                                            edit_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId);

                                        "delete" ->
                                            remove_delayed_message(LServer,#delayed_msg{from = binary_to_list(From#jid.luser), relativeId = RelativeId});                                           
                                        _ ->
                                            add_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId)
                                        end
                            end,
                            drop
                    end;
                    <<"groupchat">> ->
                    DeltaTimeStampS = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("scheduled_time")}, cdata])),
                    case DeltaTimeStampS of
                        "" ->
                            Packet;
                        _ ->
                            RelativeId = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_id")}, cdata])),
                            {DeltaTimeStampI, _Rest} = string:to_integer(DeltaTimeStampS),
                            case _Rest of
                                [] ->
                                    Action = binary_to_list(xml:get_path_s(XML, [{elem, list_to_binary("delayed_msg_action")}, cdata])),
                                    ScheduledTimestamp = from_now_to_microseconds(erlang:now(),DeltaTimeStampI),
                                    NewChildren = lists:delete(lists:keyfind(<<"scheduled_time">>, 2, XML#xmlel.children),XML#xmlel.children),
                                    NewXML = XML#xmlel{ children = NewChildren },
                                    case Action of
                                        "edit" ->                                           
                                            edit_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId);

                                        "delete" ->
                                            remove_delayed_message(LServer,#delayed_msg{from = binary_to_list(From#jid.luser), relativeId = RelativeId});                                           
                                        _ ->
                                            add_delayed_message(LServer, binary_to_list(From#jid.luser), binary_to_list(To#jid.luser), binary_to_list(To#jid.lserver), ScheduledTimestamp, NewXML,RelativeId)
                                        end
                            end,
                            drop
                    end;
                _ -> Packet
            end;
        _ -> Packet
    end.


process_sm_iq(_From, _To, #iq{type = get, xmlns = ?NS_DELAYMSG} = IQ) ->
    ?INFO_MSG("Processing IQ Get query:~n ~p", [IQ]),
    IQ#iq{type = result, sub_el = [{xmlelement, "value", [], [{xmlcdata, "Hello World of Testing."}]}]};
process_sm_iq(_From, _To, #iq{type = set} = IQ) ->
    ?INFO_MSG("Processing IQ Set: it does nothing", []),
    IQ#iq{type = result, sub_el = []};
process_sm_iq(_From, _To, #iq{sub_el = SubEl} = IQ) ->
    ?INFO_MSG("Processing IQ other type: it does nothing", []),
    IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}.

%%--------------------------------------------------------------------
%% ODBC Functions
%%--------------------------------------------------------------------


remove_delayed_message(LServer, #delayed_msg{from=FromUserName, relativeId = RelativeId}) ->
        QR = ejabberd_odbc:sql_query(
          LServer,
          [<<"delete from delayed_message "
                "where from_jid = '">>, ejabberd_odbc:escape(FromUserName#jid.luser),<<"' and relative_id = '">>,ejabberd_odbc:escape(list_to_binary(RelativeId)),<<"';">>]),
        ?DEBUG("DELETE ~p", [QR]).

prepare_delayed_message(SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId) ->
    [<<"('">>,   ejabberd_odbc:escape(list_to_binary(SFromUserName)),
     <<"', '">>, ejabberd_odbc:escape(list_to_binary(SToUsername)),
     <<"', '">>, ejabberd_odbc:escape(list_to_binary(SServer)),
     <<"', ">>,   integer_to_list(SScheduledTimestamp),
     <<", '">>, ejabberd_odbc:escape(xml:element_to_binary(SPacket)),
     <<"', '">>, ejabberd_odbc:escape(list_to_binary(SRelativeId)),
     <<"')">>].

add_delayed_message(LServer, SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket, SRelativeId) ->
    Rows = prepare_delayed_message(SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId),
    QR = ejabberd_odbc:sql_query(
      LServer,
      [<<"insert into delayed_message(from_jid,to_jid,server,scheduled_time,packet,relative_id) "
       "values ">>, join(Rows, "")]),
       ?DEBUG("Delayed message inserted? ~p", [QR]).

edit_delayed_message(LServer,SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket, SRelativeId) ->
    ejabberd_odbc:sql_query(
      LServer,
      [<<"update delayed_message set to_jid='">>,ejabberd_odbc:escape(list_to_binary(SToUsername)),
       <<"' , server='">>,ejabberd_odbc:escape(list_to_binary(SServer)),
       <<"' , scheduled_time=">>,integer_to_list(SScheduledTimestamp),
       <<", packet='">>,ejabberd_odbc:escape(xml:element_to_binary(SPacket)),
       <<"' where from_jid='">>,ejabberd_odbc:escape(list_to_binary(SFromUserName)),
       <<"' AND relative_id = '">>, ejabberd_odbc:escape(list_to_binary(SRelativeId)),<<"';">>]).

search_delayed_messages(LServer, SScheduledTimestamp) ->
    ScheduledTimestamp = encode_timestamp(SScheduledTimestamp),
    Query = [<<"select id,from_jid,to_jid,server,scheduled_time,packet,relative_id from delayed_message where ">>,
        <<"(scheduled_time < ">>,ScheduledTimestamp,<<" OR ">>,ScheduledTimestamp,<<" = 0);">>],

    case ejabberd_odbc:sql_query(LServer,Query) of
        {selected, [<<"id">>,<<"from_jid">>,<<"to_jid">>,<<"server">>,<<"scheduled_time">>,<<"packet">>,<<"relative_id">>], Rows} ->
            {ok, rows_to_records(Rows)};
        {aborted, Reason} ->
            {error, Reason};
        {error, Reason} ->
            {error, Reason}
    end.

    rows_to_records(Rows) ->
    [row_to_record(Row) || Row <- Rows].

row_to_record({SId, SFromUserName, SToUsername, SServer, SScheduledTimestamp, SPacket,SRelativeId}) ->

    Id = list_to_integer(binary_to_list(SId)),
    Server = binary_to_list(SServer),
    From = jlib:make_jid(SFromUserName,SServer,<<"fb">>),
    To = jlib:make_jid(SToUsername,SServer,<<"fb">>),
    ScheduledTimestamp = microseconds_to_now(list_to_integer(binary_to_list(SScheduledTimestamp))),
    Packet = xml_stream:parse_element(SPacket),
    RelativeId = binary_to_list(SRelativeId),
    #delayed_msg{id = Id,
             from = From,
             to = To,
             server = Server,
             scheduledTimestamp = ScheduledTimestamp,
             packet = Packet,
             relativeId = RelativeId}.


%% ------------------------------------------------------------------
%% Helpers

choose_strategy(true,  true, get) -> get;
choose_strategy(true,  true, set) -> set;
choose_strategy(false, _,    _  ) -> not_allowed;
choose_strategy(_,     _,    _  ) -> forbidden.

compare_bare_jids(#jid{luser = LUser, lserver = LServer},
                  #jid{luser = LUser, lserver = LServer}) -> true;
compare_bare_jids(_, _) -> false.

element_to_namespace(#xmlel{attrs = Attrs}) ->
    xml:get_attr_s(<<"xmlns">>, Attrs);
element_to_namespace(_) ->
    <<>>.

%% Skip invalid elements.
to_map(Elems) ->
    [{NS, Elem} || Elem <- Elems, is_valid_namespace(NS = element_to_namespace(Elem))].

is_valid_namespace(Namespace) -> Namespace =/= <<>>.

error_iq(IQ=#iq{sub_el=SubElem}, ErrorStanza) ->
    IQ#iq{type = error, sub_el = [SubElem, ErrorStanza]}.


from_now_to_microseconds({Mega, Secs, Micro}, FromNow) ->
    Mega*1000*1000*1000*1000 + Secs * 1000 * 1000 + Micro + FromNow.

now_to_microseconds({Mega, Secs, Micro}) ->
    Mega*1000*1000*1000*1000 + Secs * 1000 * 1000 + Micro.


encode_timestamp(TimeStamp) ->
    integer_to_list(TimeStamp).

maybe_encode_timestamp(never) ->
    "null";
maybe_encode_timestamp(TimeStamp) ->
    encode_timestamp(TimeStamp).

microseconds_to_now(MicroSeconds) when is_integer(MicroSeconds) ->
    Seconds = MicroSeconds div 1000000,
    {Seconds div 1000000, Seconds rem 1000000, MicroSeconds rem 1000000}.

join([], _Sep) ->
    [];
join([H|T], Sep) ->
    [H, [[Sep, X] || X <- T]].
3
You should give the whole module code, or at least a working subset. This code won't do anything when init is called since it doesn't loop or spawn any process. I guess it is part of a gen_server, or your own server, but all this code is missing.Pascal
Make sure your process is not brutal_kill'dLol4t0
@Pascal now I posted all the codeLuz Angeles
@Lol4t0 thank you for you response, how can I check that my process is not brutal_kill?Luz Angeles
Well it's not the case here. Are you sure your process terminates at all in the first place?Lol4t0

3 Answers

0
votes

In general, one shouldn't use terminate unless necessary to write custom processes or behaviours which ought to plug into the supervision hierarchy.

I've just skimmed through the code, so please correct me if I'm wrong in any of my assumptions.

You're dealing with two things here: an ejabberd module and a process supporting some of its functionality and you intertwine their initialization a bit too much.

By ejabberd module I mean a component of the server (which doesn't necessarily mean that any stanza coming from a client will pass process boundaries when handled by the server when code from the component is called).

You introduce a process to be able to measure time ticks which is fine. However, you also put some module initialization into your process initialization function (init/1).

I see that none of your IQ/hook handlers actually call your process in their code - this is good, since it means they're not really dependent on the process being there or not (like when it is restarted by a supervisor).

I'd suggest to setup your module in start/2 (register hook handlers, IQ handlers, ...), tear it down in stop/1 and assume the supervisor will restart the process in case of runtime errors - don't tie module setup/teardown and process lifetime by placing handler (de)registration in init/terminate. If the supervisor has to restart your process, it should be almost immediate - and even then, your IQ/hook handlers don't depend on the module being there - why tie them in init/terminate?

There's one more thing - if you want the module to only start after the process is running (which might be required sometimes, though is not strictly required here) then remember that supervisor:start_child is synchronous and blocking - it will only return once the new child has been successfully started. Once you have this guarantee (i.e. the call has returned with the correct return value) you can safely continue with module setup (hook / IQ handler setup).


If you require your code to scale well then timer is not the best choice - use erlang:send_after/3 or the Timeout part of a return value expected from handle_* callbacks.

0
votes

I never used ejabberd, so my comments are very general.

You use both gen_mod behavior (mandatory for all ejjaberd modules) and gen_server behavior from OTP. I understand you did it to use the tick message, but you could use the function timer:apply_interval(Time,Module,Function,Args) to get the same result. Thus you can remove all the gen_server behavior and its call_back, and it will remain only the start and stop gen_mod callbacks.

In the init/1 function you call process_flag(trap_exit, true), I think that it is, in general, a bad idea, especially when you don't have any management of error messages (in your code it should be handled by one handle_info clause, but here it is the handle_info(_Info, State) -> {noreply, State}. which will match).

0
votes

Thank you for your answers. As I am very new to Erlang I didn't know I have to compile the erl file. I compiled it and now is working.

This is the instruction to compile erl files:

erlc -I /path/to/include/files/ module_name.erl