I created a behaviour based on gen_server which I named gen_select. Using it you write a callback module using the module attribute -behaviour(gen_select)
. In your init/1 callback you open an ets or dets file and define a match specification and a limit. The process will chunk through the table calling your handle_record/2
callback for each record until the end of the file. I found this a handy paradigm for some "big data" sort of work I've been doing.
You could use it on the underlying ets table of your mnesia table, if that's appropriate, or modify it to use mnesia:select/4.
%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%% large number of records are read from an {@link
%%% or {@link
%%% to have supervised workers mapping over all the records in a table.
%%% The user will call `gen_select:start_link/3', from a supervisor, to
%%% create a process which will iterate over the selected records of a
%%% table. The `init/1' callback should open the table to
%%% be read and construct a match specification to be used to select
%%% records from the table. It should return a tuple of the form:
%%% ```
%%% {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%% TableType :: ets | dets
%%% Table :: ets:tid() | atom() % when Type=ets
%%% Table :: dets:tab_name() % when Type=dets
%%% MatchSpec :: match_spec() % see ets:select/2
%%% Limit :: integer() % see ets:select/3
%%% State :: term()
%%% Reason :: term()
%%% '''
%%% After initialization {@link
%%% or {@link
%%% using the `match_spec()' and `Limit' returned by `init/`'. The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%%
-module(gen_select).
-author('[email protected]').
%% export the gen_select API
-export([start_link/3]).
%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).
%% exports used internally
-export([init_it/6]).
%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
{ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
| {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
{next_record, NewState :: state()}
| {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
any().
-import(error_logger, [format/2]).
%%----------------------------------------------------------------------
%% gen_select API
%%----------------------------------------------------------------------
-spec start_link(Mod :: atom(), Args :: term(),
Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%%
start_link(Mod, Args, Options) ->
gen:start(?MODULE, link, Mod, Args, Options).
%%----------------------------------------------------------------------
%% internal exports
%%----------------------------------------------------------------------
-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
no_return().
%% @doc Called by {@link
%% the process.
%% Copied from
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
Debug = debug_options(Pid, Options),
case catch CallBackMod:init(Args) of
{ok, TableMod, Table, MatchSpec, Limit, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
case catch ets:select(Table, MatchSpec, Limit) of
{Matches, Cont} when is_list(Matches) ->
loop1(Parent, CallBackMod, Debug, State,
TableMod, Cont, Matches);
'$end_of_table' ->
proc_lib:init_ack(Starter, {error, eof}),
exit(eof);
{error, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason)
end;
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
Error = {bad_return_value, Else},
proc_lib:init_ack(Starter, {error, Error}),
exit(Error)
end.
%%----------------------------------------------------------------------
%% system process callbacks
%%----------------------------------------------------------------------
-type misc() :: [CallBackMod :: atom() | [State :: state()
| [TableMod :: atom() | [Cont :: term()
| [Matches :: [tuple()] | []]]]]].
-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
Misc :: misc()) -> no_return().
%% @doc Called by {@link
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
TableMod, Cont, Matches]) ->
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).
-spec system_terminate(Reason :: term(), Parent :: pid(),
Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
_TableMod, _Cont, _Matches]) ->
terminate(Reason, CallBackMod, Debug, State).
-spec system_code_change(Misc :: misc(), Module :: atom(),
OldVsn :: undefined | term(), Extra :: term()) ->
{ok, NewMisc :: misc()}.
%% @doc Called by {@link
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
_Module, OldVsn, Extra) ->
case catch CallBackMod:code_change(OldVsn, State, Extra) of
{ok, NewState} ->
{ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
Other ->
Other
end.
-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
| [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
[tuple()].
%% @doc Called by {@link
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug,
[CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
Header = gen:format_status_header("Status for table reader", self()),
Log = sys:get_debug(log, Debug, []),
DefaultStatus = [{data, [{"State", State}]}],
Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
true ->
case catch CallBackMod:format_status(Opt, [PDict, State]) of
{'EXIT', _} ->
DefaultStatus;
StatusList when is_list(StatusList) ->
StatusList;
Else ->
[Else]
end;
_ ->
DefaultStatus
end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log}]}
| Specfic].
%%----------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------
-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(),
Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%% Copied from
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
receive
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[CallBackMod, State, TableMod, Cont, Matches]);
{'EXIT', Parent, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
Msg ->
sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
after 0 ->
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
end.
-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state(), TableMod :: atom(), Cont :: term(),
Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
case catch CallBackMod:handle_record(H, State) of
{next_record, NewState} ->
loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
{stop, Reason, NewState} ->
terminate(Reason, CallBackMod, Debug, NewState);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
case catch TableMod:select(Cont) of
{Matches, NewCont} when is_list(Matches) ->
sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
'$end_of_table' ->
terminate(eof, CallBackMod, Debug, State);
{error, Reason} ->
terminate(Reason, CallBackMod, Debug, State);
{'EXIT', Reason} ->
terminate(Reason, CallBackMod, Debug, State)
end.
-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%% Copied from
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
case catch CallBackMod:terminate(Reason, State) of
{'EXIT', R} ->
error_info(R, State, Debug),
exit(R);
_ ->
case Reason of
normal ->
exit(normal);
shutdown ->
exit(shutdown);
{shutdown, _} = Shutdown ->
exit(Shutdown);
_ ->
FmtState = case erlang:function_exported(CallBackMod,
format_status, 2) of
true ->
case catch CallBackMod:format_status(terminate,
[get(), State]) of
{'EXIT', _} ->
State;
Else ->
Else
end;
_ ->
State
end,
error_info(Reason, FmtState, Debug),
exit(Reason)
end
end.
-spec error_info(Reason :: term(), State :: state(),
Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%% Copied from
%% @hidden
error_info(Reason, State, Debug) ->
Reason1 = case Reason of
{undef, [{M, F, A, L} | MFAs]} ->
case code:is_loaded(M) of
false ->
{'module could not be loaded', [{M, F, A, L} | MFAs]};
_ ->
case erlang:function_exported(M, F, length(A)) of
true ->
Reason;
false ->
{'function not exported', [{M, F, A, L} | MFAs]}
end
end;
_ ->
Reason
end,
format("** Table reader ~p terminating \n"
"** When Server state == ~p~n"
"** Reason for termination == ~n** ~p~n",
[self(), State, Reason1]),
sys:print_log(Debug),
ok.
%% Copied from
opt(Op, [{Op, Value} | _]) ->
{ok, Value};
opt(Op, [_ | Options]) ->
opt(Op, Options);
opt(_, []) ->
false.
%% Copied from
debug_options(Name, Opts) ->
case opt(debug, Opts) of
{ok, Options} ->
dbg_options(Name, Options);
_ ->
dbg_options(Name, [])
end.
%% Copied from
dbg_options(Name, []) ->
Opts = case init:get_argument(generic_debug) of
error ->
[];
_ ->
[log, statistics]
end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
%% Copied from
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
{'EXIT',_} ->
format("~p: ignoring erroneous debug options - ~p~n",
[Name, Opts]),
[];
Dbg ->
Dbg
end.
-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link
print_event(Dev, {in, Msg}, Pid) ->
io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).
iterating
over the whole table, so that you stream objects that match your specification to the remote node in chunks or bits ? – Muzaaya Joshua