I implemented a consumer, which will reconnect to broker automatically after a while if underlying connection is closed. My case is as below:
- Launch RabbitMQ server successfully.
- Launch consumer successfully.
- Published a message, and consumer received it successfully.
Stop RabbitMQ server, consumer will show an exception:
com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}.
And then consumer will sleep 60 seconds before reconnect.
- Launch RabbitMQ server again.
- Publish a message successfully, the result of command 'list_queues' is 0
- After 60 seconds, consumer connect to RabbitMQ again, however now messages received which are published at step#6.
- Publish the 3rd message, consumer received it successfully.
In this case, all messages published before reconnect will be lost. Also I performed another experiment.
- Launch RabbitMQ, and publish a message successfully(no consumer process launched).
- Stop RabbitMQ, then restart it.
- Launch consumer process, receive the message published at step#1 successfully.
Note:The QOS of consumer is 1. I have researched RabbitMQ several days, in my understanding, consumer should get the message published before reconnect. Pls help(I ran test based on windows rabbitMQ).
Below is the PUBLISHER:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();
And the CONSUMER is as below:
@Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
final int qos) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
while (true) {
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
// declare a durable, non-exclusive, non-autodelete queue.
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// distribute workload among all consumers, consumer will
// pre-fetch
// {qos}
// messages to local buffer.
channel.basicQos(qos);
logger.debug(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// disable auto-ack. If enable auto-ack, RabbitMQ delivers a
// message to
// the customer it immediately removes it from memory.
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
RabbitMessageConsumer.this.consumeMessage(delivery);
}
catch (Exception e) {
// the exception shouldn't affect the next message
logger.info("[IGNORE]" + e.getMessage());
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (Exception e) {
logger.warn(e);
}
if (autoReconnect) {
this.releaseConn(connection);
logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
+ this.reconnectInterval / 1000 + " seconds.");
Thread.sleep(this.getReconnectInterval());
}
else
break;
}
}
private void releaseConn(Connection conn) {
try {
if (conn != null)
conn.close();
}
catch (Exception e) {
// simply ignore this exception
}
}
As it is a 'topic' exchange, no queue is declared at PUBLISHER. However at step#3 of 1st test, the durable queue has been declared, and the message is durable as well. I don't understand why message will be lost before reconnect.