1
votes

I have an external C++ STOMP client that sends a message to a queue

myqueue

and subscribes a topic

mytopic

A flow (Mule plugin in eclipse mars) receiving the message is configured, the message is modified and transferred for an echo response:

<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"/>

  <http:listener-config name="HTTP" host="localhost" port="8081"  doc:name="HTTP Listener Configuration"/>

   <flow name="jmsFlow">

        <jms:inbound-endpoint  queue="myqueue" connector-ref="Active_MQ" doc:name="JMS">

            <jms:transaction action="NONE"/>

        </jms:inbound-endpoint>

        <logger message="#[string: Logger1 Response: #[payload]]" level="INFO" doc:name="Logger1"/>

        <response>

            <echo-component doc:name="Echo"/>

        </response>

        <component class="org.mule.java.JavaClient" doc:name="Java"/>

    </flow>

The C++ STOMP client:

static BoostStomp*  stomp_client;
static string       notifications_topic = "mytopic";
static string       registration_queue = "myqueue";
static string       result("");
static int          number = 0;

bool subscription_callback(STOMP::Frame& _frame)
{
    number = _frame.body().v.size();
    result = _frame.body().c_str();
    return(true);
}

//int main(int argc, char *argv[])
int main(int argc, char *argv[])
{
    string  stomp_host = "localhost";
    int     stomp_port = 61613;

    char* msg = argv[0];
    int nmbr = 1;
    char* mymsg = new char[nmbr];
    strncpy(mymsg,msg,nmbr);
    mymsg[nmbr] = '\0';
    string msgstr = string(mymsg);
    try
    {
        // initiate a new BoostStomp client
        stomp_client = new BoostStomp(stomp_host, stomp_port);

        // start the client, (by connecting to the STOMP server)
        stomp_client->start();//(user, pass);

        // subscribe to a channel
        stomp_client->subscribe(notifications_topic, (STOMP::pfnOnStompMessage_t) &subscription_callback);

        // construct a headermap
        STOMP::hdrmap headers;
        string body = string("mymessage");

        // add an outgoing message to the queue
        stomp_client->send(registration_queue, headers, body);

        Sleep(10000);
        nmbr = number;
        strncpy(msg,result.c_str(),nmbr);
        msg[nmbr] = '\0';
        cout << "return message is " << result.c_str();
        result.clear();

        Sleep(10000);
        stomp_client->unsubscribe(notifications_topic);
        Sleep(1000);
        stomp_client->stop();
        delete stomp_client;
    }
    catch (std::exception& e)
    {
        cerr << "Error in BoostStomp: " << e.what() << "\n";
        return 1;
    }
    cout << "Call of test works!" << endl;
    return 0;
}

I'm not sure about how to send the modified message to the topic

mytopic

instead of an echo reponse. Any suggestions?

Maybe, another option is to use a Java app that implements a STOMP-message sending:

StompConnection connection = new StompConnection();
connection.open("localhost", 61613);
connection.connect("","");

connection.send("/mytopic", msg.toString());

Receiving the message with

connection.subscribe("/mytopic", Subscribe.AckModeValues.CLIENT);
StompFrame frame = connection.receive();
System.out.println("JavaClient received message: " + frame.getBody());
connection.disconnect();

works, but the external STOMP client still does not receive it. The STOMP cout is:

[12:03:28: 00625D48] BoostStomp:starting...
[12:03:28: 00625D48] BoostStomp:STOMP: Connecting to [::1]:61613...
[12:03:28: 00625D48] BoostStomp:STOMP TCP connection to [::1]:61613 is active
[12:03:28: 00625D48] BoostStomp:Sending CONNECT frame...
[12:03:28: 0062E698] BoostStomp:Worker thread: starting...
[12:03:28: 0062E698] BoostStomp:server supports STOMP version 1.1
waiting for answer
[12:03:29: 0062E698] BoostStomp:Sending SUBSCRIBE frame...
[12:03:29: 0062E698] BoostStomp:Sent!
[12:03:29: 0062E698] BoostStomp:Sending SEND frame...
[12:03:29: 0062E698] BoostStomp:Sent!
2

2 Answers

0
votes

Below screenshot is the sample on how to send message to your subscriber using JMS(ActiveMQ) topic.

Active MQ send topic

0
votes

Hi @LeBro see sample below of sample source code:

    <?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd">
    <jms:activemq-connector name="Active_MQ" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"/>
    <jms:activemq-connector name="Active_MQ_Outbound" brokerURL="tcp://localhost:61616" validateConnections="true" doc:name="Active MQ"/>
    <flow name="active-mq-testFlow">
        <jms:inbound-endpoint queue="myqueue" connector-ref="Active_MQ" doc:name="JMS"/>
        <logger level="INFO" doc:name="Logger"/>
        <set-payload value="#['Transformed Message : ' + payload]" doc:name="Set Payload"/>
        <jms:outbound-endpoint topic="mytopic" connector-ref="Active_MQ_Outbound" doc:name="JMS Outbound"/>
    </flow>
</mule>

Try to access your local ActiveMQ using this url: http://localhost:8161/

I tried to create message queue using ActiveMQ admin to

then try to run the mule application.

The mule app successfully consume the message then send it to topic.

Sample topic created