1
votes

ActiveMQCPP version: 3.7.1 AcitveMQBroker version: 5.10.0

Here is a simple example. The code includes both consumer and producer

// START SNIPPET: demo 

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>

#include <decaf/util/Random.h>

using namespace activemq::core; 
using namespace decaf::util::concurrent; 
using namespace decaf::util; 
using namespace decaf::lang; 
using namespace cms; 
using namespace std; 

#define  QUEUE_NAME    "eventQueue" 
#define NAME_BYTE_LEN        16 

class HelloWorldProducer : public ExceptionListener, 
        public MessageListener, 
        public Runnable { 
private: 
        CountDownLatch latch; 
        CountDownLatch doneLatch; 
        Connection* connection; 
        Session* session; 
        Destination* destination; 
        MessageProducer* producer; 
        int numMessages; 
        bool useTopic; 
        bool sessionTransacted; 
        std::string brokerURI; 
        bool bReciveMessage; 
        long waitMillis; 

private: 

        HelloWorldProducer(const HelloWorldProducer&); 
        HelloWorldProducer& operator=(const HelloWorldProducer&); 

public: 

        HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, 
                long waitMillis = 3000) : 
                latch(1), 
                doneLatch(numMessages), 
                connection(NULL), 
                session(NULL), 
                destination(NULL), 
                producer(NULL), 
                numMessages(numMessages), 
                useTopic(useTopic), 
                sessionTransacted(sessionTransacted), 
                brokerURI(brokerURI), 
                bReciveMessage(false), 
                waitMillis(waitMillis) 
        { } 

        virtual ~HelloWorldProducer() { 
                cleanup(); 
        } 

        void close() { 
                this->cleanup(); 
        } 

        void waitUntilReady() { 
                latch.await(); 
        } 

        virtual void run() { 

                try { 

                        // Create a ConnectionFactory 
                        auto_ptr<ConnectionFactory> connectionFactory( 
                                ConnectionFactory::createCMSConnectionFactory(brokerURI)); 

                        // Create a Connection 
                        connection = connectionFactory->createConnection(); 
                        connection->start(); 

                        // Create a Session 
                        if (this->sessionTransacted) { 
                                session = connection->createSession(Session::SESSION_TRANSACTED); 
                        } 
                        else { 
                                session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
                        } 

                        session = connection->createSession(); 
                        // Create the destination (Topic or Queue) 
                        if (useTopic) { 
                                destination = session->createTopic(QUEUE_NAME); 
                        } 
                        else { 
                                destination = session->createQueue(QUEUE_NAME); 
                        } 

                        // Create a MessageProducer from the Session to the Topic or Queue 
                        producer = session->createProducer(destination); 
                        producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 

                        // Create the Thread Id String 
                        string threadIdStr = Long::toString(Thread::currentThread()->getId()); 

                        // Create a messages 
                        string text = (string) "Hello world! from thread " + threadIdStr; 

                        for (int ix = 0; ix < numMessages; ++ix) { 
                                std::auto_ptr<TextMessage> message(session->createTextMessage(text)); 

                                //????... 
                                std::auto_ptr<Destination> tempDest(session->createTemporaryQueue()); 

                                //cms::Destination tempDest=session->createTemporaryTopic() ; 
                                MessageConsumer * responseConsumer = session->createConsumer(tempDest.get()); 
                                responseConsumer->setMessageListener(this);//??... 


                                message->setCMSReplyTo(tempDest.get()); 
                                Random random; 
                                char buffer[NAME_BYTE_LEN] = { 0 }; 
                                random.nextBytes((unsigned char *)buffer, NAME_BYTE_LEN); 
                                string correlationId = ""; 
                                for (int i = 0; i < NAME_BYTE_LEN; ++i) 
                                { 
                                        char ch[NAME_BYTE_LEN * 2] = { 0 }; 
                                        sprintf(ch, "%02X", (unsigned char)buffer[i]); 
                                        string str(ch); 

                                        correlationId += str; 
                                } 

                                message->setCMSCorrelationID(correlationId); 

                                message->setIntProperty("Integer", ix); 
                                printf("Producer Sent message #%d from thread %s\n", ix + 1, threadIdStr.c_str()); 
                                producer->send(message.get()); 

                                // Indicate we are ready for messages. 
                                latch.countDown(); 

                                // Wait while asynchronous messages come in. 
                                doneLatch.await(waitMillis); 

                        } 
                } 
                catch (CMSException& e) { 
                        printf("Producer run() CMSException \n"); 
                        // Indicate we are ready for messages. 
                        latch.countDown(); 
                        e.printStackTrace(); 
                } 


        } 


        // Called from the Producer since this class is a registered MessageListener. 
        virtual void onMessage(const Message* message) { 

                static int count = 0; 

                try { 
                        count++; 
                        const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 
                        //ActiveMQMessageTransformation 
                        //std::auto_ptr<TextMessage> responsemessage(session->createTextMessage()); 
                        //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 
                        //responsemessage->getCMSReplyTo() 

                        string text = ""; 

                        if (textMessage != NULL) { 
                                text = textMessage->getText(); 
                        } 
                        else { 
                                text = "NOT A TEXTMESSAGE!"; 
                        } 

                        printf("Producer Message #%d Received: %s\n", count, text.c_str()); 


                        //producer.send 

                } 
                catch (CMSException& e) { 
                        printf("Producer onMessage() CMSException \n"); 
                        e.printStackTrace(); 
                } 

                // Commit all messages. 
                if (this->sessionTransacted) { 
                        session->commit(); 
                } 

                // No matter what, tag the count down latch until done. 
                doneLatch.countDown(); 
        } 

        // If something bad happens you see it here as this class is also been 
        // registered as an ExceptionListener with the connection. 
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
                printf("Producer onException() CMS Exception occurred.  Shutting down client. \n"); 
                ex.printStackTrace(); 
                exit(1); 
        } 


private: 

        void cleanup() { 

                if (connection != NULL) { 
                        try { 
                                connection->close(); 
                        } 
                        catch (cms::CMSException& ex) { 
                                ex.printStackTrace(); 
                        } 
                } 

                // Destroy resources. 
                try { 
                        delete destination; 
                        destination = NULL; 
                        delete producer; 
                        producer = NULL; 
                        delete session; 
                        session = NULL; 
                        delete connection; 
                        connection = NULL; 
                } 
                catch (CMSException& e) { 
                        e.printStackTrace(); 
                } 
        } 
}; 

class HelloWorldConsumer : public ExceptionListener, 
        public MessageListener, 
        public Runnable { 

private: 

        CountDownLatch latch; 
        CountDownLatch doneLatch; 
        Connection* connection; 
        Session* session; 
        Destination* destination; 
        MessageConsumer* consumer; 
        MessageProducer *producer; 
        long waitMillis; 
        bool useTopic; 
        bool sessionTransacted; 
        std::string brokerURI; 

private: 

        HelloWorldConsumer(const HelloWorldConsumer&); 
        HelloWorldConsumer& operator=(const HelloWorldConsumer&); 

public: 

        HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) : 
                latch(1), 
                doneLatch(numMessages), 
                connection(NULL), 
                session(NULL), 
                destination(NULL), 
                consumer(NULL), 
                producer(NULL), 
                waitMillis(waitMillis), 
                useTopic(useTopic), 
                sessionTransacted(sessionTransacted), 
                brokerURI(brokerURI) { 
        } 

        virtual ~HelloWorldConsumer() { 
                cleanup(); 
        } 

        void close() { 
                this->cleanup(); 
        } 

        void waitUntilReady() { 
                latch.await(); 
        } 

        virtual void run() { 

                try { 

                        // Create a ConnectionFactory 
                        auto_ptr<ConnectionFactory> connectionFactory( 
                                ConnectionFactory::createCMSConnectionFactory(brokerURI)); 

                        // Create a Connection 
                        connection = connectionFactory->createConnection(); 
                        connection->start(); 
                        connection->setExceptionListener(this); 

                        // Create a Session 
                        if (this->sessionTransacted == true) { 
                                session = connection->createSession(Session::SESSION_TRANSACTED); 
                        } 
                        else { 
                                session = connection->createSession(Session::AUTO_ACKNOWLEDGE); 
                        } 

                        // Create the destination (Topic or Queue) 
                        if (useTopic) { 
                                destination = session->createTopic(QUEUE_NAME); 
                        } 
                        else { 
                                destination = session->createQueue(QUEUE_NAME); 
                        } 

                        producer = session->createProducer(); 
                        producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); 

                        // Create a MessageConsumer from the Session to the Topic or Queue 
                        consumer = session->createConsumer(destination); 

                        consumer->setMessageListener(this); 

                        std::cout.flush(); 
                        std::cerr.flush(); 

                        // Indicate we are ready for messages. 
                        latch.countDown(); 

                        // Wait while asynchronous messages come in. 
                        doneLatch.await(); 

                } 
                catch (CMSException& e) { 
                        printf("Consumer onException() CMS Exception occurred.  Shutting down client. \n"); 
                        // Indicate we are ready for messages. 
                        latch.countDown(); 
                        e.printStackTrace(); 
                } 
        } 

        // Called from the consumer since this class is a registered MessageListener. 
        virtual void onMessage(const Message* message) { 

                static int count = 0; 

                try { 
                        count++; 


                        // Create the Thread Id String 
                        string threadIdStr = Long::toString(Thread::currentThread()->getId()); 

                        static bool bPrintf = true; 
                        if (bPrintf) 
                        { 
                                bPrintf = false; 
                                printf("consumer Message threadid: %s\n", threadIdStr.c_str()); 
                        } 

                        string strReply = "consumer return  xxx,ThreadID=" + threadIdStr; 
                        const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message); 

                        if (NULL == textMessage) 
                        { 
                                printf("NULL==textMessage %s", message->getCMSType().c_str()); 
                                return; 
                        } 

                        std::auto_ptr<TextMessage> responsemessage(session->createTextMessage(strReply)); 
                        responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID()); 


                        string text = ""; 

                        if (textMessage != NULL) { 
                                text = textMessage->getText(); 
                        } 
                        else { 
                                text = "NOT A TEXTMESSAGE!"; 
                        } 

                        int nProPerty = textMessage->getIntProperty("Integer"); 
                        printf("consumer Message #%d Received: %s,nProPerty[%d]\n", count, text.c_str(), nProPerty); 


                        const cms::Destination* destSend = textMessage->getCMSReplyTo(); 
                        if (destSend) 
                        { 
                                this->producer->send(destSend, responsemessage.get()); 

                                printf("consumer Message #%d send: %s\n", count, strReply.c_str()); 
                        } 


                } 
                catch (CMSException& e) { 
                        printf("Consumer onMessage() CMS Exception occurred.  Shutting down client. \n"); 
                        e.printStackTrace(); 
                } 

                // Commit all messages. 
                if (this->sessionTransacted) { 
                        session->commit(); 
                } 

                // No matter what, tag the count down latch until done. 
                //doneLatch.countDown(); 
        } 

        // If something bad happens you see it here as this class is also been 
        // registered as an ExceptionListener with the connection. 
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) { 
                printf("Consumer onException() CMS Exception occurred.  Shutting down client. \n"); 
                //printf("CMS Exception occurred.  Shutting down client.\n"); 
                ex.printStackTrace(); 
                exit(1); 
        } 

private: 

        void cleanup() { 
                if (connection != NULL) { 
                        try { 
                                connection->close(); 
                        } 
                        catch (cms::CMSException& ex) { 
                                ex.printStackTrace(); 
                        } 
                } 

                // Destroy resources. 
                try { 
                        delete destination; 
                        destination = NULL; 
                        delete consumer; 
                        consumer = NULL; 
                        delete session; 
                        session = NULL; 
                        delete connection; 
                        connection = NULL; 
                } 
                catch (CMSException& e) { 
                        e.printStackTrace(); 
                } 
        } 
}; 

int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { 

        //if(argc<2) 
        //{ 
        //    printf("argc<2\r\n"); 
        //    return 0; 
        //} 

        activemq::library::ActiveMQCPP::initializeLibrary(); 
        { 
                std::cout << "=====================================================\n"; 
                std::cout << "Starting the example:" << std::endl; 
                std::cout << "-----------------------------------------------------\n"; 


                // Set the URI to point to the IP Address of your broker. 
                // add any optional params to the url to enable things like 
                // tightMarshalling or tcp logging etc.  See the CMS web site for 
                // a full list of configuration options. 
                // 
                //  http://activemq.apache.org/cms/
                // 
                // Wire Format Options: 
                // ========================= 
                // Use either stomp or openwire, the default ports are different for each 
                // 
                // Examples: 
                //    tcp://127.0.0.1:61616                      default to openwire 
                //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above 
                //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead 
                // 
                // SSL: 
                // ========================= 
                // To use SSL you need to specify the location of the trusted Root CA or the 
                // certificate for the broker you want to connect to.  Using the Root CA allows 
                // you to use failover with multiple servers all using certificates signed by 
                // the trusted root.  If using client authentication you also need to specify 
                // the location of the client Certificate. 
                // 
                //     System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" ); 
                //     System::setProperty( "decaf.net.ssl.keyStorePassword", "password" ); 
                //     System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" ); 
                // 
                // The you just specify the ssl transport in the URI, for example: 
                // 
                //     ssl://localhost:61617 
                // 
                std::string brokerURI = "tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false"; 

                //============================================================ 
                // set to true to use topics instead of queues 
                // Note in the code above that this causes createTopic or 
                // createQueue to be used in both consumer an producer. 
                //============================================================ 
                bool useTopics = false; 
                bool sessionTransacted = true; 
                int numMessages = 1; 
                bool useConsumer = true; 
                bool useProducer = true; 

                //int nSet=atoi(argv[1]); 
                //if(1==nSet) 
                //{ 
                //#define USE_COMSUMER 


                //} 
                //else 
                //{ 
                //#define USE_PRODUCER 

                // 
                //} 



                long long startTime = System::currentTimeMillis(); 

#ifdef USE_PRODUCER 
                printf("?? USE_PRODUCER \r\n"); 

                int numProducerMessages = 30; 
                int nThreadNumber = 10; 
                vector<HelloWorldProducer *> vHelloWorldProducer; 
                for (int i = 0; i < nThreadNumber; ++i) 
                { 
                        HelloWorldProducer * producerTemp = new HelloWorldProducer(brokerURI, numProducerMessages, useTopics); 
                        vHelloWorldProducer.push_back(producerTemp); 
                } 

#endif 

#ifdef USE_COMSUMER 
                printf("?? USE_COMSUMER \r\n"); 
                HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted); 
                // Start the consumer thread. 
                Thread consumerThread(&consumer); 
                consumerThread.start(); 

                // Wait for the consumer to indicate that its ready to go. 
                consumer.waitUntilReady(); 

#endif 




#ifdef USE_PRODUCER 
                // Start the producer thread. 

                vector<Thread *> vThread; 
                for (int i = 0; i < nThreadNumber; ++i) 
                { 
                        HelloWorldProducer & ProducerTemp = *vHelloWorldProducer[i]; 
                        Thread * threadTemp = new Thread(&ProducerTemp); 
                        vThread.push_back(threadTemp); 
                        threadTemp->start(); 
                        ProducerTemp.waitUntilReady(); 

                } 

                for (size_t i = 0; i < vThread.size(); ++i) 
                { 
                        Thread * threadTemp = vThread[i]; 
                        //threadTemp->join(); 
                } 
                while (1) 
                { 
                        Thread::sleep(10); 
                } 

                //Thread producerThread1(&producer1); 
                //producerThread1.start(); 
                //producer1.waitUntilReady(); 

                //Thread producerThread2(&producer2); 
                //producerThread2.start(); 
                //producer2.waitUntilReady(); 

                //Thread producerThread3(&producer3); 
                //producerThread3.start(); 
                //producer3.waitUntilReady(); 
#endif 




#ifdef USE_PRODUCER 
                // Wait for the threads to complete. 
                //producerThread1.join(); 
                //producerThread2.join(); 
                //producerThread3.join(); 
#endif 

#ifdef USE_COMSUMER 
                consumerThread.join(); 
#endif 

                long long endTime = System::currentTimeMillis(); 
                double totalTime = (double)(endTime - startTime) / 1000.0; 

#ifdef USE_PRODUCER 
                //producer1.close(); 
                //producer2.close(); 
                //producer3.close(); 

                for (size_t i = 0; i < vHelloWorldProducer.size(); ++i) 
                { 
                        HelloWorldProducer * ProducerTemp = vHelloWorldProducer[i]; 
                        ProducerTemp->close(); 

                        if (ProducerTemp) 
                        { 
                                delete ProducerTemp; 
                                ProducerTemp = NULL; 
                        } 
                } 

#endif 
#ifdef USE_COMSUMER 
                consumer.close(); 
#endif 




                std::cout << "Time to completion = " << totalTime << " seconds." << std::endl; 
                std::cout << "-----------------------------------------------------\n"; 
                std::cout << "Finished with the example." << std::endl; 
                std::cout << "=====================================================\n"; 

        } 
        activemq::library::ActiveMQCPP::shutdownLibrary(); 


        return 0; 
} 

When I run the above example producer and consumer following happens:

  1. Producer is able to put the message on the queue.
  2. Consumer is able to retrieve the message from the queues.
  3. When consumer tries to send response back using replyTo desitnation, the send fails with error message listed above.

On the broker and consumer/producer I have advisorySupport turned OFF. When I turn them on this work fine.

What I would like to know: 1. How can I make the error go away but still have advisorySupport off.

Thanks a lot for your help.

1

1 Answers

0
votes

The error was resolved by downcasting cms::Connection to ActiveMQConnection and calling .setWatchTopicAdvisories(false).