1
votes

I have an application which sends messages to a queue, and another application which subscribes to the queue and process it. I want OTP messages to be given higher priority than other messages, hence I am trying to use ActiveMQ message priority to achieve this.

This is the code for ActiveMQ connection using STOMP protocol in nodejs using stompit library:

const serverPrimary = {
  host: keys.activeMQ.host,
  port: keys.activeMQ.port,
  ssl: ssl,
  connectHeaders: {
    host: '/',
    login: keys.activeMQ.username,
    passcode: keys.activeMQ.password,
    'heart-beat': '5000,5000',
  },
}
connManager = new stompit.ConnectFailover(
  [serverPrimary, serverFailover],
  reconnectOptions,
)
connManager.on('error', function (e) {
  const connectArgs = e.connectArgs
  const address = connectArgs.host + ':' + connectArgs.port
  logger.error({ error: e, customMessage: address })
})

    channelPool = new stompit.ChannelPool(connManager)

Code for sending message

const pushMessageToAMQ = (queue, message) => {
      const queues = Object.values(activeMQ.queues)
      if (!queues.includes(queue)) {
        _mqLog(mqLogMessages.unknownQueue + queue)
        return
      }
      //Priority header is set
      const header = {
        destination: queue,
        priority: 7
      }
      //If message is not a string
      if (typeof message !== 'string') message = JSON.stringify(message)
      //Logging message before sending
      _mqLog(
        mqLogMessages.sending,
        { service: services.amq },
        { header: header, message: message },
      )
      //Sending message to amq
      _sendMessageToAMQ(header, message, error => {
        if (error) {
          _mqError(error, mqLogMessages.sendingError, { service: services.amq })
        }
      })
    }

const _sendMessageToAMQ = (headers, body, callback) => {
      channelPool.channel((error, channel) => {
        if (error) {
          callback(error)
          return
        }
        channel.send(headers, body, callback)
      })
    }

Here's the code for subscribing to queue in the second application:

const amqSubscribe = (queue, callback, ack = 'client-individual') => {
  log({ customMessage: 'Subscribing to ' + queue })
  const queues = Object.values(activeMQ.queues)
  if (!queues.includes(queue)) {
    return
  }
  channelPool.channel((error, channel) => {
    let header = {
      destination: queue,
      ack: ack,
      'activemq.prefetchSize': 1,
    }
    //Check for error
    if (error) {
      _mqError(error, mqLogMessages.baseError, header)
    } else {
      channel.subscribe(
        header,
        _synchronisedHandler((error, message, next) => {
          //Check for error
          if (error) {
            _mqError(error, mqLogMessages.subscriptionError, header)
            next()
          } else {
            //Read message
            message.readString('utf-8', function (error, body) {
              if (error) {
                _mqError(error, mqLogMessages.readError, header)
                next()
              } else {
                //Message read successfully call callback
                callback(body, () => {
                  //Acknowledgment callback
                  channel.ack(message)
                  next()
                })
              }
            })
          }
        }),
      )
    }
  })
}

Activemq.xml

<policyEntries>
            <policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
.......

I tried pushing different messages with different priority and turned on the second application (i.e. the one which subscribes to the messages) after all the messages were pushed to queue. However, the execution order of the messages was the same as the one which was sent. The priority didn't change anything. Is there something that I am missing? Do I have to add something in consumer end for it to work?

1
What ActiveMQ backend do you have? And what priorities did you send?aventurin
I used priority 3 and 7 as values. I am locally using activemq 5.16.0. In production i will be using amazon MQ as the broker.Sangeeth Mukundan

1 Answers

1
votes

Support for priority is disabled by default in ActiveMQ "Classic" (used by Amazon MQ). As the documentation states:

...support [for message priority] is disabled by default so it needs to be be enabled using per destination policies through xml configuration...

You need to set prioritizedMessages="true" in the policyEntry for your queue, e.g.:

 <destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" prioritizedMessages="true"/>
      ...

To be clear, this is configured on the broker (i.e. not the client) in activemq.xml, and it applies to every kind of client.