ActiveMQ is losing a lot of messages when failover is involved (only on topics). The producer writes 1000 messages in the topic while (at the same time) the consumer is reading from the same topic. In the middle of the process, I close ActiveMQ master and the process is continued with the ActiveMQ slave. When the transition is made, a lot of messages are lost (~100 messages). The product I'm working on involves not losing messages. What could I do for persistence on topics? Producer:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>
std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");
std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";
void CreateMessage()
{
for (int i = 0; i < _multiplyFactor; i++) {
_bodyMessage += _garbageMessage;
}
}
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
CreateMessage();
activemq::core::ActiveMQConnectionFactory factory;
factory.setBrokerURI(_amqURI);
std::auto_ptr<cms::TextMessage> message;
std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));
connection->start();
std::auto_ptr<cms::Session> session(connection->createSession());
std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
for (int i = 0; i < _countMessages; i++) {
std::stringstream ss;
ss << i;
std::string number = ss.str();
message.reset(session->createTextMessage(number));
producer->send(message.get());
std::cout << i << std::endl;
}
//message.reset(session->createTextMessage("DONE"));
//producer->send(message.get());
//connection->close();
//activemq::library::ActiveMQCPP::shutdownLibrary();
return 0;
}
Consumer:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>
std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
class MsgListener : public cms::MessageListener
{
public:
std::string _amqURI;
cms::Connection *_connection;
cms::Session* _session;
cms::Destination* _destination;
cms::MessageConsumer* _consumer;
bool _sessionTransacted;
bool _useTopic;
MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
{
this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
this->_connection->start();
/*if (this->_sessionTransacted == true) {
this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
}
else {
this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
}*/
this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);
if (useTopic) {
this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
}
else {
this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
}
this->_consumer = this->_session->createConsumer(this->_destination);
this->_consumer->setMessageListener(this);
/*std::cout.flush();
std::cerr.flush();*/
}
~MsgListener()
{
}
void onMessage(const cms::Message* CMSMessage)
{
static int count = 0;
try
{
const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
std::string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
}
else {
text = "NOT A TEXTMESSAGE!";
}
std::cout << "(" << count << ", " << text << ")" << std::endl;
count++;
}
catch (cms::CMSException& e)
{
e.printStackTrace();
}
if (this->_sessionTransacted) {
this->_session->commit();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
MsgListener consumer(amqURI, true, true);
while (true);
//activemq::library::ActiveMQCPP::shutdownLibrary();
}
Consumer_durable:
#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>
#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>
std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
class MsgListener : public cms::MessageListener
{
public:
std::string _amqURI;
cms::Connection *_connection;
cms::Session* _session;
cms::Destination* _destination;
cms::MessageConsumer* _consumer;
bool _sessionTransacted;
bool _useTopic;
MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
{
this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
this->_connection->start();
/*if (this->_sessionTransacted == true) {
this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
}
else {
this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
}*/
this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);
if (useTopic) {
this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
}
else {
this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
}
//this->_consumer = this->_session->createConsumer(this->_destination);
static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
this->_consumer->setMessageListener(this);
/*std::cout.flush();
std::cerr.flush();*/
}
~MsgListener()
{
}
void onMessage(const cms::Message* CMSMessage)
{
static int count = 0;
try
{
const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
std::string text = "";
if (textMessage != NULL) {
text = textMessage->getText();
}
else {
text = "NOT A TEXTMESSAGE!";
}
std::cout << "(" << count << ", " << text << ")" << std::endl;
count++;
}
catch (cms::CMSException& e)
{
e.printStackTrace();
}
if (this->_sessionTransacted) {
this->_session->commit();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
};
int main()
{
activemq::library::ActiveMQCPP::initializeLibrary();
MsgListener consumer(amqURI, true, true);
while (true);
//activemq::library::ActiveMQCPP::shutdownLibrary();
}