9
votes

Disclaimer: I'm pretty new to Erlang and OTP.

I want a simple pubsub in Erlang/OTP, where processes could subscribe at some "hub" and receive a copy of messages that were sent to that hub.

I know about gen_event, but it processes events in one single event manager process, while I want every subscriber to be a separate, autonomous process. Also, I was unable to grok gen_event's handlers supervision. Unfortunately, Google results were full of XMPP (Ejabberd) and RabbitMQ links, so I didn't find anything relevant to my idea.

My idea is that such pubsub model seamlessly maps to supervision tree. So I thought to extend the supervisor (a gen_server under the hood) to be able to send a cast message to all its children.

I've hacked this in my quick-and-dirty custom "dispatcher" behavior:

-module(dispatcher).
-extends(supervisor).
-export([notify/2, start_link/2, start_link/3, handle_cast/2]).

start_link(Mod, Args) ->
    gen_server:start_link(dispatcher, {self, Mod, Args}, []).

start_link(SupName, Mod, Args) ->
    gen_server:start_link(SupName, dispatcher, {SupName, Mod, Args}, []).

notify(Dispatcher, Message) ->
    gen_server:cast(Dispatcher, {message, Message}).

handle_cast({message, Message}, State) ->
    {reply, Children, State} = supervisor:handle_call(which_children, dummy, State),
    Pids = lists:filter(fun(Pid) -> is_pid(Pid) end,
                 lists:map(fun({_Id, Child, _Type, _Modules}) -> Child end,
                           Children)),
    [gen_server:cast(Pid, Message) || Pid <- Pids],
    {noreply, State}.

However, while everything seem to work fine at the first glance (children receive messages and are seamlessly restarted when they fail), I wonder whenever this was a good idea.

Could someone, please, criticize (or approve) my approach, and/or recommend some alternatives?

4
Are the messages stored and new processes subscribing get the whole history of them. Or is it that messages are passed on only from the point of time when a process subscribed?Peer Stritzinger
The latter; like in Redis or 0MQ Pub/Sub. I'll take another look at gen_event, thanks.drdaeman

4 Answers

11
votes

I've recently used gproc to implement pubsub. The example from the readme does the trick.

subscribe(EventType) ->
    %% Gproc notation: {p, l, Name} means {(p)roperty, (l)ocal, Name}
    gproc:reg({p, l, {?MODULE, EventType}}).

notify(EventType, Msg) ->
    Key = {?MODULE, EventType},
    gproc:send({p, l, Key}, {self(), Key, Msg}).
10
votes

From your code it looks to me that gen_event handlers are a perfect match.

The handler callbacks are called from one central process dispatching the messages, but these callbacks shouldn't do much work.

So if you need a autonomous process with its own state for the subscribers, just send a message in the event callback.

Usually these autonomous processes would be gen_servers and you just would call gen_server:cast from your event callbacks.

Supervision is a separate issue, that can be handled by the usual supervision infrastructure that comes with OTP. How you want to do supervision depends on the semantics of your subscriber processes. If they are all identical servers, you could use a simple_one_for_one for example.

In the init callback of the subscriber processes you can put the gen_event:add_handler calls that adds them to the event manager.

You can even use the event manager as supervisor if you use the gen_event:add_sup_handler function to add your processes if the semantics of this suits you.

Online resources for understanding gen_event better: Learn you some Erlang chapter

Otherwise the Erlang books all have some gen_event introduction. Probably the most thorough one you can find in Erlang and OTP in Action

Oh and BTW: I wouldn't hack up your own supervisors for this.

1
votes

A very simple example where you do it all yourself is in my very basic chat_demo which is a simple web-based chat server. Look at chat_backend.erl (or chat_backend.lfe if you like parentheses) which allows users to subscribe and they will then be sent all messages that arrive at the backend. It does not fit into supervision trees though the modification is simple (although it does use proc_lib to get better error messages).

-2
votes

Sometimes ago, i read about øMQ (ZeroMQ), which has a bunch of bindings to different programming languages.

http://www.zeromq.org/

http://www.zeromq.org/bindings:erlang

If it must not an pure erlang solution, this could be a choice.