I have a message queue using ActiveMQ. A web request puts messages into the queue with persistency=True. Now, I have 2 consumers that are both connected as separate sessions to this queue. Consumer 1 always acknowledges the message, but consumer 2 never does.
Now, I read this http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html which states:
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
What I understand from this is that I would expect all messages to eventually be processed by consumer 1 since it always acknowledges. Since consumer 2 does not acknowledge, the message should then get sent to consumer 1.
But what I am noticing is the following: 1. When I submit a request, I see only every 2nd request coming to consumer 1. The other request does not show up and it stored in ActiveMQ. I suppose it went to consumer 2 who did not acknowledge. So should it come to consumer 1 next?
I just need to ensure that the message gets processed by one consumer only. In my case, this consumer is a machine in country (site) X. Each message needs to be handled in only one country (machine). But all countries (machines) should received the message. If the country id matches in the message, it will acknowledge. So only 1 acknowledgement/message will be sent.
My code to receive/process messages looks like this:
# --------------------------------------------- MODULE IMPORT ---------------------------------------------------------#
import argparse
import json
import logging
import multiprocessing as mp
import sys
import stomp
from tvpv_portal.services.msgbkr import MsgBkr
from utils import util
# --------------------------------------------- DEVELOPMENT CODE ------------------------------------------------------#
log = logging.getLogger(__name__)
class MessageProcessingListener(stomp.ConnectionListener):
"""This class is responsible for processing (consuming) the messages from ActiveMQ."""
def __init__(self, conn, cb):
"""Initialization.
Args:
conn -- Connection object
cb -- Callback function
"""
self._conn = conn
self._cb = cb
def on_error(self, headers, body):
"""When we get an error.
Args:
headers -- Message header
body -- Message body
"""
log.error('Received error=%s', body)
def on_message(self, headers, body):
"""When we receive a message.
Args:
headers -- Message header
body -- Message body
"""
log.info('Received message')
# Deserialize the message.
item = json.loads(body)
import pprint
pprint.pprint(item)
# TODO: check if msg is to be handled by this SITE. If so, acknowledge and queue it. Otherwise, ignore.
# Put message into queue via callback (queue.put) function.
#self._cb(item)
# TODO: we only send acknowledge if we are supposed to process this message.
# Send acknowledgement to ActiveMQ indicating message is consumed.
self._conn.ack(headers['message-id'], headers['subscription'])
def worker(q):
"""Worker to retrieve item from queue and process it.
Args:
q -- Queue
"""
# Run in an infinite loop. Get an item from the queue to process it. We MUST call q.task_done() to indicate
# that item is processed to prevent deadlock.
while True:
try:
item = q.get()
# TODO: We will call external script from here to run on Netbatch in the future.
log.info('Processed message')
finally:
q.task_done()
def flash_mq_rst_handler_main():
"""Main entry to the request handler."""
# Define arguments.
parser = argparse.ArgumentParser(description='Flash message queue request handler script',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=False)
opts = parser.add_argument_group('Options')
opts.add_argument('-h', '--help', action='help',
help='Show this help message and exit')
opts.add_argument('--workers', metavar='val', type=int, default=4,
help='Number of worker processes')
opts.add_argument('--log', metavar='file', type=util.get_resolved_abspath, default='flash_mq_rst_handler.log',
help='Log file')
# Parse arguments.
args = parser.parse_args()
# Setup logger.
util.configure_logger(args.log)
log.info('Command line %s', ' '.join(map(str, sys.argv)))
# Create a managed queue to store messages retrieved from message queue.
queue = mp.Manager().JoinableQueue()
# Instantiate consumer message broker + ensure connection.
consumer = MsgBkr(producer=False)
if not consumer.is_connected():
log.critical('Unable to connect to message queue; please debug')
sys.exit(1)
# Register listener with consumer + queue.put as the callback function to trigger when a message is received.
consumer.set_listener('message_processing_listener', MessageProcessingListener, cb=queue.put)
# Run in an infinite loop to wait form messages.
try:
log.info('Create pool with worker=%d to process messages', args.workers)
with mp.Pool(processes=args.workers) as pool:
p = pool.apply_async(worker, (queue,))
p.get()
except KeyboardInterrupt:
pass
# See MsgBkr. It will close the connection during exit() so we don't have to do it.
sys.exit(0)
if __name__ == '__main__':
flash_mq_rst_handler_main()