1
votes

I have a large table in Mnesia and because of various reasons (not important here, say that I am executing the select remotely and the result has to be send over network using some 3rd party libraries) I can't select all rows in one select. I am already splitting the select to only retrieve a specific amount of columns at once.

e.g. this is an example of a select to retrieve only specific columns:

mnesia:dirty_select([table,[{{table,'$1','_','$3','$4','_','_','_'},[],['$$']}]]).

I run that select twice with different set of columns and then combine the results. But now it turned out that one of the columns is also too large to be retrieved in one select. So I'd like to split one large select into two selects, each retrieving only half of rows in that column. Is there an easy way of retrieving, say, every second row? Something like select only odd rows, and then select only even rows? Or maybe a way of retrieving the first half and then the second half of the rows?

I tried to select all rows from one of the columns and then use it as an index to retrieve specific rows. This works but takes quite some time to construct the select and then to execute it.

EDIT: Sorry that I didn't stress enough the fact that the select is being executed remotely. I know about iterating over records or accessing the file directly, but the challenge here is that those records in question have to be retrieved using a relatively small amount of commands, and that those commands have to be capable of executing remotely.

For example:

  1. Select only the first column (simple single mnesia:dirty_select command).
  2. Once the result is retrieved (over network), split it into two sets and use as keys to construct selects to get specific records (each select would contain a long list of keys to retrieve but that's fine as they are simple Erlang terms that can be send over network)
  3. Retrieve all rows in two steps using those two sets of keys created in 2.

That works but is not easy and not optimal as it sends quite a lot of data both ways. There may not be an easy solution unless the selects are constructed to take into accounts specific data contained in each column and row (e.g. match all rows with names in the first column starting with letters 'A' to 'M'). I am just not sure what is possible using standard Mnesia commands and was hoping for an easier solution.

2
I know this isn't directly relevant to your question, but what are the symptoms of too much data to retrieve in one select? One thing to keep in mind when splitting queries apart is the increased potential for race conditions.Sage Mitchell
reading your question, it seems that the limitation you are facing does not come from Mnesia nor Erlang. So why don't you get your object list in one single select and then split the result in as many pieces as required by your 3rd party library?Pascal
It's just too much explaining why I need to retrieve the data using multiple selects. I am aware of all the issues this involves. As I said, I am executing them remotely, and the issue is that the 3rd party library can send only a limited amount of data as a result of executing one command. But anyway, that's not what the question is about, however I appreciate your input.Greg
how about iterating over the whole table, so that you stream objects that match your specification to the remote node in chunks or bits ?Muzaaya Joshua
@MuzaayaJoshua, as I mentioned I am executing the select remotely. I can't match each record over the network as that would kill the performance. The only thing I can do is a select.Greg

2 Answers

0
votes

If you put some key on your table that is an auto-incrementing integer, then you could roughly do this with a QLC.

Evens = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 0]).
Odds  = qlc:q([Rec || Rec <- Table, (Rec#table.int_key rem 2) =:= 1]).

An alternative, if you don't have, or want such a key, is to use qlc:fold/3 over your table to filter out every second record. Mnesia should use a temporary file for iterating over the data if it needs to for memory reasons.

CollectOddsFun = fun(Rec, {N, List}) ->
                     NewList = case N rem 2 of
                       0 -> [Rec|List];
                       1 -> List
                     end,
                     {N+1, NewList}
                  end,

qlc:fold(CollectOddsFun, {0, []}, Table).
0
votes

I created a behaviour based on gen_server which I named gen_select. Using it you write a callback module using the module attribute -behaviour(gen_select). In your init/1 callback you open an ets or dets file and define a match specification and a limit. The process will chunk through the table calling your handle_record/2 callback for each record until the end of the file. I found this a handy paradigm for some "big data" sort of work I've been doing. You could use it on the underlying ets table of your mnesia table, if that's appropriate, or modify it to use mnesia:select/4.

%%% gen_select.erl
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% @doc This module implements a behaviour pattern where a potentially
%%%     large number of records are read from an {@link //stdlib/ets. ets}
%%%     or {@link //stdlib/dets. dets} table.  This is used in an application
%%%     to have supervised workers mapping over all the records in a table.
%%%     The user will call `gen_select:start_link/3', from a supervisor, to
%%%     create a process which will iterate over the selected records of a
%%%     table.  The `init/1' callback should open the table to
%%%     be read and construct a match specification to be used to select 
%%%     records from the table.  It should return a tuple of the form:
%%%     ```
%%%     {ok, TableType, Table, MatchSpec, Limit, State} | {stop, Reason} | ignore
%%%         TableType :: ets | dets
%%%         Table :: ets:tid() | atom()  % when Type=ets
%%%         Table :: dets:tab_name()     % when Type=dets
%%%         MatchSpec :: match_spec()    % see ets:select/2
%%%         Limit :: integer()           % see ets:select/3
%%%         State :: term()
%%%         Reason :: term()
%%%     '''
%%% After initialization {@link //stdlib/ets:select/3. ets:select/3}
%%% or {@link //stdlib/dets:select/3. dets:select/3} will be called
%%% using the `match_spec()' and `Limit' returned by `init/`'.  The
%%% callback function `handle_record/2' will then be called for each
%%% record returned then `select/1' will be called to get more records.
%%% This is repeated until the end of the table is reached when the
%%% callback `terminate/2' is called with `Reason=eof'..
%%% 
-module(gen_select).
-author('[email protected]').

%% export the gen_select API
-export([start_link/3]).

%% export the callbacks needed for a system process
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([format_status/2]).

%% exports used internally
-export([init_it/6]).

%% define the callback exports of a module behaving as gen_select
-type state() :: term().
-callback init(Args :: term()) ->
    {ok, TableType :: ets | dets, Table :: ets:tid() | atom() | dets:tab_name(),
        MatchSpec :: ets:match_spec(), Limit :: non_neg_integer(), State :: state()}
        | {stop, Reason :: term()} | ignore.
-callback handle_record(Record :: tuple(), State :: state()) ->
    {next_record, NewState :: state()}
        | {stop, Reason :: term(), NewState :: state()}.
-callback terminate(Reason :: eof | term(), State :: state()) ->
    any().

-import(error_logger, [format/2]).

%%----------------------------------------------------------------------
%%  gen_select API
%%----------------------------------------------------------------------

-spec start_link(Mod :: atom(), Args :: term(),
        Options :: gen:options()) -> gen:start_ret().
%% @doc Creates a {@module} process as part of a supervision tree.
%% 
start_link(Mod, Args, Options) ->
    gen:start(?MODULE, link, Mod, Args, Options).

%%----------------------------------------------------------------------
%%  internal exports
%%----------------------------------------------------------------------

-spec init_it(Starter :: pid(), LinkP :: gen:linkage(), Pid :: pid(),
        CallBackMod :: atom(), Args :: term(), Options :: gen:options()) ->
    no_return().
%% @doc Called by {@link //stdlib/gen:start/5. gen:start/5} to initialize
%%  the process.
%%  Copied from //stdlib/gen_server:init_it/6.
%% @hidden
init_it(Starter, Parent, Pid, CallBackMod, Args, Options) ->
    Debug = debug_options(Pid, Options),
    case catch CallBackMod:init(Args) of
        {ok, TableMod, Table, MatchSpec, Limit, State} ->
            proc_lib:init_ack(Starter, {ok, self()}),
            case catch ets:select(Table, MatchSpec, Limit) of
                {Matches, Cont} when is_list(Matches) ->
                    loop1(Parent, CallBackMod, Debug, State,
                            TableMod, Cont, Matches);
                '$end_of_table' ->
                    proc_lib:init_ack(Starter, {error, eof}),
                    exit(eof);
                {error, Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason);
                {'EXIT', Reason} ->
                    proc_lib:init_ack(Starter, {error, Reason}),
                    exit(Reason)
            end;
        {stop, Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        ignore ->
            proc_lib:init_ack(Starter, ignore),
            exit(normal);
        {'EXIT', Reason} ->
            proc_lib:init_ack(Starter, {error, Reason}),
            exit(Reason);
        Else ->
            Error = {bad_return_value, Else},
            proc_lib:init_ack(Starter, {error, Error}),
            exit(Error)
    end.

%%----------------------------------------------------------------------
%%  system process callbacks
%%----------------------------------------------------------------------

-type misc() :: [CallBackMod :: atom() | [State :: state()
        | [TableMod :: atom() | [Cont :: term()
        | [Matches :: [tuple()] | []]]]]].

-spec system_continue(Parent :: pid(), Debug :: [gen:dbg_opt()],
        Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to continue.
%% @private
system_continue(Parent, Debug, [CallBackMod, State,
        TableMod, Cont, Matches]) ->
    loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches).

-spec system_terminate(Reason :: term(), Parent :: pid(),
        Debug :: [gen:dbg_opt()], Misc :: misc()) -> no_return().
%% @doc Called by {@link //sys:handle_system_msg/6} to terminate.
%% @private
system_terminate(Reason, _Parent, Debug, [CallBackMod, State,
        _TableMod, _Cont, _Matches]) ->
    terminate(Reason, CallBackMod, Debug, State).

-spec system_code_change(Misc :: misc(), Module :: atom(),
        OldVsn :: undefined | term(), Extra :: term()) ->
    {ok, NewMisc :: misc()}.
%% @doc Called by {@link //sys:handle_system_msg/6} to update `Misc'.
%% @private
system_code_change([CallBackMod, State, TableMod, Cont, Matches],
        _Module, OldVsn, Extra) ->
    case catch CallBackMod:code_change(OldVsn, State, Extra) of
        {ok, NewState} ->
            {ok, [CallBackMod, NewState, TableMod, Cont, Matches]};
        Other ->
            Other
    end.

-type pdict() :: [{Key :: term(), Value :: term()}].
-type status_data() :: [PDict :: pdict() | [SysState :: term()
        | [Parent :: pid() | [Debug :: [gen:dbg_opt()] | [Misc :: misc() | []]]]]].
-spec format_status(Opt :: normal | terminate, StatusData :: status_data()) ->
    [tuple()].
%% @doc Called by {@link //sys:get_status/1} to print state.
%% @private
format_status(Opt, [PDict, SysState, Parent, Debug, 
        [CallBackMod, State, _TableMod, _Cont, _Matches]]) ->
    Header = gen:format_status_header("Status for table reader", self()),
    Log = sys:get_debug(log, Debug, []),
    DefaultStatus = [{data, [{"State", State}]}],
    Specfic = case erlang:function_exported(CallBackMod, format_status, 2) of
        true ->
            case catch CallBackMod:format_status(Opt, [PDict, State]) of
                {'EXIT', _} ->
                    DefaultStatus;
                StatusList when is_list(StatusList) ->
                    StatusList;
                Else ->
                    [Else]
            end;
        _ ->
            DefaultStatus
    end,
    [{header, Header},
            {data, [{"Status", SysState},
                    {"Parent", Parent},
                    {"Logged events", Log}]}
            | Specfic].

%%----------------------------------------------------------------------
%%  internal functions
%%----------------------------------------------------------------------

-spec loop1(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(),
        Cont :: term(), Matches :: [tuple()]) -> no_return().
%% @doc Main loop.
%%  Copied from //stdlib/gen_server:loop1/6.
%% @hidden
loop1(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches) ->
    receive
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
                    [CallBackMod, State, TableMod, Cont, Matches]);
        {'EXIT', Parent, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        Msg ->
            sys:handle_debug(Debug, fun print_event/3, self(), {in, Msg})
    after 0 ->
        loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, Matches)
    end.

-spec loop2(Parent :: pid(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state(), TableMod :: atom(), Cont :: term(),
        Matches :: [tuple()]) -> no_return().
%% @doc Run the `select/1' function.
%% @hidden
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, [H | T]) ->
    case catch CallBackMod:handle_record(H, State) of
        {next_record, NewState} ->
            loop1(Parent, CallBackMod, Debug, NewState, TableMod, Cont, T);
        {stop, Reason, NewState} ->
            terminate(Reason, CallBackMod, Debug, NewState);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end;
loop2(Parent, CallBackMod, Debug, State, TableMod, Cont, []) ->
    case catch TableMod:select(Cont) of
        {Matches, NewCont} when is_list(Matches) ->
            sys:handle_debug(Debug, fun print_event/3, self(), {read, Matches}),
            loop1(Parent, CallBackMod, Debug, State, TableMod, NewCont, Matches);
        '$end_of_table' ->
            terminate(eof, CallBackMod, Debug, State);
        {error, Reason} ->
            terminate(Reason, CallBackMod, Debug, State);
        {'EXIT', Reason} ->
            terminate(Reason, CallBackMod, Debug, State)
    end.

-spec terminate(Reason :: term(), CallBackMod :: atom(), Debug :: [gen:dbg_opt()],
        State :: state()) -> no_return().
%% @doc Terminate the {@module} process.
%%  Copied from //stdlib/gen_server:terminate/6.
%% @hidden
terminate(Reason, CallBackMod, Debug, State) ->
    case catch CallBackMod:terminate(Reason, State) of
        {'EXIT', R} ->
            error_info(R, State, Debug),
            exit(R);
        _ ->
            case Reason of
                normal ->
                    exit(normal);
                shutdown ->
                    exit(shutdown);
                {shutdown, _} = Shutdown ->
                    exit(Shutdown);
                _ ->
                    FmtState = case erlang:function_exported(CallBackMod,
                            format_status, 2) of
                        true ->
                            case catch CallBackMod:format_status(terminate,
                                    [get(), State]) of
                                {'EXIT', _} ->
                                    State;
                                Else ->
                                    Else
                            end;
                        _ ->
                            State
                    end,
                    error_info(Reason, FmtState, Debug),
                    exit(Reason)
            end
    end.

-spec error_info(Reason :: term(), State :: state(),
        Debug :: [gen:dbg_opt()]) -> ok.
%% @doc Print error log message.
%%  Copied from //stdlib/gen_server:error_info/5.
%% @hidden
error_info(Reason, State, Debug) ->
    Reason1 = case Reason of
        {undef, [{M, F, A, L} | MFAs]} ->
            case code:is_loaded(M) of
                false ->
                    {'module could not be loaded', [{M, F, A, L} | MFAs]};
                _ ->
                    case erlang:function_exported(M, F, length(A)) of
                        true ->
                            Reason;
                        false ->
                            {'function not exported', [{M, F, A, L} | MFAs]}
                    end
            end;
        _ ->
            Reason
   end,
    format("** Table reader ~p terminating \n"
            "** When Server state == ~p~n"
            "** Reason for termination == ~n** ~p~n",
            [self(), State, Reason1]),
    sys:print_log(Debug),
    ok.

%% Copied from //stdlib/gen_server:opt/2
opt(Op, [{Op, Value} | _]) ->
    {ok, Value};
opt(Op, [_ | Options]) ->
    opt(Op, Options);
opt(_, []) ->
    false.

%% Copied from //stdlib/gen_server:debug_options/2
debug_options(Name, Opts) ->
    case opt(debug, Opts) of
        {ok, Options} ->
            dbg_options(Name, Options);
        _ ->
            dbg_options(Name, [])
    end.

%% Copied from //stdlib/gen_server:dbg_options/2
dbg_options(Name, []) ->
    Opts = case init:get_argument(generic_debug) of
        error ->
            [];
        _ ->
            [log, statistics]
    end,
    dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
    dbg_opts(Name, Opts).

%% Copied from //stdlib/gen_server:dbg_opts/2
dbg_opts(Name, Opts) ->
    case catch sys:debug_options(Opts) of
        {'EXIT',_} ->
            format("~p: ignoring erroneous debug options - ~p~n",
                    [Name, Opts]),
            [];
        Dbg ->
            Dbg
    end.

-spec print_event(IoDevice :: io:device(), Event :: term(), Pid :: pid()) -> ok.
%% @doc Called by {@link //sys:handle_debug/4} to print trace events.
print_event(Dev, {in, Msg}, Pid) ->
    io:format(Dev, "*DBG* ~p got ~p~n", [Pid, Msg]);
print_event(Dev, {read, Matches}, Pid) ->
    io:format(Dev, "*DBG* ~p read ~b records~n", [Pid, length(Matches)]).