Actually, the socket routing is done based on how to define your topics in your projects Socket module with the channel
API. For my Slack clone, I use three channels. I have a system level channel to handle presence update, a user channel, and a room channel.
Any given user is subscribed to 0 or 1 channels. However, users may be subscribed to a number of channels.
For messages going out to a specific room, I broadcast them over the room channel.
When I detect unread messages, notifications, or badges for a particular room, I use the user channel. Each user channel stores the list of rooms the user has subscribed too (they are listed on the client's side bar).
The trick to all this is using a couple channel APIs, mainly intercept
, handle_out
, My.Endpoint.subscribe
, and handle_info(%Broadcast{},socket)
.
- I use
intercept
to catch broadcasted messages that I want to either ignore, or manipulate before sending them out.
- In the user channel, I subscribe to events broadcast from the room channel
- When you subscribe, you get a
handle_info
call with the %Broadcast{}
struct that includes the topic, event, and payload of the broadcasted message.
Here are couple pieces of my code:
defmodule UcxChat.UserSocket do
use Phoenix.Socket
alias UcxChat.{User, Repo, MessageService, SideNavService}
require UcxChat.ChatConstants, as: CC
## Channels
channel CC.chan_room <> "*", UcxChat.RoomChannel # "ucxchat:"
channel CC.chan_user <> "*", UcxChat.UserChannel # "user:"
channel CC.chan_system <> "*", UcxChat.SystemChannel # "system:"
# ...
end
# user_channel.ex
# ...
intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
#...
def handle_out("room:join", msg, socket) do
%{room: room} = msg
UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
update_rooms_list(socket)
clear_unreads(room, socket)
{:noreply, subscribe([room], socket)}
end
def handle_out("room:leave" = ev, msg, socket) do
%{room: room} = msg
debug ev, msg, "assigns: #{inspect socket.assigns}"
socket.endpoint.unsubscribe(CC.chan_room <> room)
update_rooms_list(socket)
{:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
end
# ...
defp subscribe(channels, socket) do
# debug inspect(channels), ""
Enum.reduce channels, socket, fn channel, acc ->
subscribed = acc.assigns[:subscribed]
if channel in subscribed do
acc
else
socket.endpoint.subscribe(CC.chan_room <> channel)
assign(acc, :subscribed, [channel | subscribed])
end
end
end
# ...
end
I also use the user_channel for all events related to a specific user like client state, error messages, etc.