0
votes

In our current application using JBoss EAP 6.2, we have many batch jobs triggered by remote EJB invocations. In order to centralize all notification logic for these jobs we are deciding to route all calls through an MDB by passing a serialized message. The intended flow is as below:

  • Batch job client sends message to a remote queue
  • MDB listens on this remote queue, process message and invokes EJB
  • DLQ is configured to process notifications when all retries are exhausted
  • Notification should also be sent on each retry. To avoid too many notifications, retry interval is sufficiently high

To handle the last point, I tried creating a Reply queue by setting it in the JMSReplyTo header. To simulate above flow, I have created the below MDB implementations...

Main MDB:

@MessageDriven(name = "MiddleManMDB", activationConfig = {
      @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/test"),
      @ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"),
      @ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=localhost;port=5445"),
      @ActivationConfigProperty(propertyName = "user", propertyValue = "queueuser"),
      @ActivationConfigProperty(propertyName = "password", propertyValue = "queuepassword")
})
public class MiddleManMDB implements MessageListener {

   private static final Logger LOGGER = LoggerFactory.getLogger(MiddleManMDB.class);

   @Resource(name = "java:/JmsXA")
   private ConnectionFactory connectionFactory;

   /*
    * (non-Javadoc)
    * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
    */
   @Override
   public void onMessage(Message message)
   {
      try {

         if (message instanceof TextMessage) {
            LOGGER.info("Received text message --> {}", ((TextMessage)message).getText());
         }

         throw new JMSException("thrown exception");
      }
      catch (Exception e) {
         sendToReplyQueue(e.getMessage(), message);

         LOGGER.info("Throwing exception to simulate retry...");
         throw new RuntimeException(e);
      }
   }

   private void sendToReplyQueue(String errorMessage, Message message)
   {
      Context context = null;
      Connection conn = null;

      LOGGER.info("Sending exception details to reply queue...");

      try {
         context = new InitialContext();

         conn = connectionFactory.createConnection();
         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination jmsReplyTo = message.getJMSReplyTo();
         MessageProducer replyProducer = session.createProducer(jmsReplyTo);
         replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage));

      }
      catch (NamingException | JMSException e) {
         e.printStackTrace();
      }
      finally {
         // close connection and context
      }
   }
}

Reply MDB:

@MessageDriven(name = "ReplyMDB", activationConfig = {
      @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/reply")
})
public class ReplyMDB implements MessageListener {

   private static final Logger LOGGER = LoggerFactory.getLogger(ReplyMDB.class);

   @Override
   public void onMessage(Message message) {
      try {
         if (message instanceof TextMessage) {
            LOGGER.info("Received reply message --> " + ((TextMessage)message).getText());
         }
      }
      catch (JMSException e) {
         LOGGER.error("Error in reply queue...", e);
      }
   }
}

** Dead Letter MDB:**

@MessageDriven(name = "DeadLetterMDB", activationConfig = {
      @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/dead")
})
public class DeadLetterMDB implements MessageListener {

   private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterMDB.class);

   @Override
   public void onMessage(Message message) {
      try {
         LOGGER.info("Message has arrived in dead letter queue");
         LOGGER.info("Current delivery count - {}", message.getIntProperty("JMSXDeliveryCount"));

         if (message instanceof TextMessage) {
            LOGGER.info("Received text message --> {}", ((TextMessage)message).getText());
         }
      }
      catch (JMSException e) {
         e.printStackTrace();
      }
   }
}

** Client:**

public static void main(String[] args) {
   Connection connection = null;
   Context context = null;

   try {
      // create context and connection factory

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Destination destination = (Destination) context.lookup("jms/queue/test");
      Destination replyDest = (Destination) context.lookup("jms/queue/reply");
      MessageProducer producer = session.createProducer(destination);
      connection.start();

      TextMessage message = session.createTextMessage("Hello World");
      message.setJMSReplyTo(replyDest);

      producer.send(message);
   }
   catch (NamingException | JMSException e) {
      e.printStackTrace();
   }
   finally {
      // close context and connection
   }
}

** Relevant entries in standalone-full.xml:**

<address-settings>
  <address-setting match="jms.queue.testQueue">
      <dead-letter-address>jms.queue.DLQ</dead-letter-address>
      <expiry-address>jms.queue.ExpiryQueue</expiry-address>
      <redelivery-delay>1000</redelivery-delay>
      <max-delivery-attempts>3</max-delivery-attempts>
      <max-size-bytes>10485760</max-size-bytes>
      <address-full-policy>BLOCK</address-full-policy>
      <message-counter-history-day-limit>10</message-counter-history-day-limit>
  </address-setting>
  <address-setting match="jms.queue.replyQueue">
      <redelivery-delay>1000</redelivery-delay>
      <max-delivery-attempts>3</max-delivery-attempts>
      <max-size-bytes>10485760</max-size-bytes>
      <address-full-policy>BLOCK</address-full-policy>
      <message-counter-history-day-limit>10</message-counter-history-day-limit>
  </address-setting>
  <address-setting match="jms.queue.DLQ">
      <redelivery-delay>1000</redelivery-delay>
      <max-delivery-attempts>3</max-delivery-attempts>
      <max-size-bytes>10485760</max-size-bytes>
      <address-full-policy>BLOCK</address-full-policy>
      <message-counter-history-day-limit>10</message-counter-history-day-limit>
  </address-setting>
</address-settings>

<jms-destinations>
  <jms-queue name="testQueue">
      <entry name="queue/test"/>
      <entry name="java:jboss/exported/jms/queue/test"/>
  </jms-queue>
  <jms-queue name="replyQueue">
      <entry name="queue/reply"/>
      <entry name="java:jboss/exported/jms/queue/reply"/>
  </jms-queue>
  <jms-queue name="DLQ">
      <entry name="queue/dead"/>
      <entry name="java:jboss/exported/jms/queue/dead"/>
  </jms-queue>
  <jms-topic name="testTopic">
      <entry name="topic/test"/>
      <entry name="java:jboss/exported/jms/topic/test"/>
  </jms-topic>
</jms-destinations>

Now with the above flow in the MDBs, the message is never received in the reply queue. All three queues are deployed on the same server.

I am guessing the reason is the below line:

sendToReplyQueue(e.getMessage(), message);
LOGGER.info("Throwing exception to simulate retry...");
throw new RuntimeException(e);

Since the send is asynchronous and I am throwing an RTE (to trigger retry), the message is somehow never sent. Is there a way to resolve this problem ?

1

1 Answers

0
votes

I am guessing the reason is the below line......

You can try with commenting RTE. Also add some more logger to trace. check if reply destination set properly or not.

 message.setJMSReplyTo(replydestination);      
 LOGGER.info("Reply to:   " + message.getJMSReplyTo());

or message sent to replay queue or not

  replyProducer.send(jmsReplyTo, session.createTextMessage(errorMessage));
  LOGGER.info("exception details sent to reply queue...");