0
votes

I am a bit new to the Erlang Environment

I am writing an email testing application that filters incoming email with a randomly generated routing_keys on a topic exchange to make emails entering my system

Once they are delivered (and processed) on a queue, I want to label them again with the previously randomly routing_key to route them to another exchange to make them ready for the final consume.

This 2nd producing step is causing me real troubles

I am getting data back from a tcp socket (processed by a third-tier program: spamassassin) with handle_info pattern matching

I rely on a gen_server to consume messages first through the regular amqp_client/include/amqp_client.hrl Library

I use handle_info in my gen_server behaviour and then pattern match on the parameters.

Detecting delivered AMQP message is done through function heads (records) in handle_info callback

TCP socket is nice to talk with spamassassin, it returns me a 3-tuple with binary string data like that:

{tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam: True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level: *******\nX-Spam-Status: Yes, score=7.9 required=5.0 tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version: 1.0\nContent-Type: multipart/mixed; boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}

The loop in the second handle_info match OK the answer from the listening gen_tcp server, but I have to do the packaging to send it to a topic Exchange (topic_scored_email exchange)

***My gen_server****
handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag}, Content}, State) ->
    #amqp_msg{props = Properties, payload = Payload} = Content,
    #'P_basic'{message_id = MessageId, headers = Headers} = Properties,
    send_to_spamassassin:calcule_score(Payload),
    {noreply, State};
handle_info(Msg, State) ->
    case Msg of
        {_,_,Data} ->
           scored_email:main(Data);
        {_,_} ->
    end,
    {noreply, State}.

***send_to_spamassassin function ***
    calcule_score(Message) ->
    case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
        {ok, Sock} ->
            …
            gen_tcp:send(Sock, Message2);
        {error,_} ->
            io:fwrite("Connection error! Quitting...~n")
    end.

***scored_email***
main(Argv) ->
    {ok, Connection} = amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_scored_email">>,type = <<"topic">>}),
    {RoutingKey, Message} = case Argv of
                                …
%DOING PATTERN MATCHING THAT WORKS HERE
                                …
                            end,
    amqp_channel:cast(Channel,#'basic.publish'{exchange = <<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload = Message})

The first issue is type of the data (binary string) but I guess it can be a workaround using BIF binary_to_tuple or stuff like that.

What I struggle to understand is how I could pass the right RoutingKey, since Erlang is functional, there is no side effect or assignation.

That change in format data (AMQP --> raw TCP --> then AMQP again) seems impossible (to me) to achieve with OTP abstraction

However, I would like to reassemble every processed message with the right routing key matched 5 lines above.

How could I modify my code, to do that? I come from imperative language and reach my limit here…

Yours

1

1 Answers

2
votes

The first issue is type of the data (binary string) but I guess it can be a workaround using BIF binary_to_tuple or stuff like that.

In all languages, you have to figure out how to parse the data you read from a socket.

What I struggle to understand is how I could pass the right RoutingKey, since Erlang is functional, there is no side effect or assignation.

That is the party line, but in reality the parameter variables of a recursive function can be used to store values. In your case, you can store the routing key in the State variable, which is then available in all gen_server callback functions. State can be a 30 element tuple if you want, so there is no limit to how much info you can store in the State variable.

Another option is to use an ets/dets table, i.e. an erlang database, to store messages with routing keys until you are ready to send ?everything? to some other process.

{RoutingKey, Message} = ...

However, I would like to reassemble every processed message with the right routing key matched 5 lines above.

If you are within same function, what prevents you from using the routing key and message that you have in the variables RoutingKey and Message? I'm unclear how there is an issue if all the code is within one function. I would think you could do something like this:

{RoutingKey, Message} = ...
ProcessedMsg = process_this(Message)
{RoutingKey, ProcessedMsg}

I suggest that you post a simple example of your problem--without all the complex matching and amqp_channel stuff to distill the problem down to its core, e.g.

handle_info(Msg, State) -> 
    RoutingKey = 3,
    ProcessedMsg = "hello",

    %% Here, I want to write: ....