0
votes

I have a dynamic topic tree on which the last message for each topic is "retained". When I subscribe to the topic tree (with JMS/Java/MQLibs), as it's dynamic and I am subscribing with the wildcard character "#", I cannot know what topics I will receive in advance. So,

  1. How can I know when I have read all available topics at least once?

  2. How can I know that the read topic is the original "retained" message that existed before I subscribed or is an update after subscription? (Do I need to keep a local map of MessageIDs against topics? I assume I cannot compare the time stamp of when I subscribed with the message creation time as the clock on the server and client may differ).

I have added some sample code, to show that I am accessing Topics not Queues.

public class JMSServerSubscriber implements MessageListener { 

public JMSServerSubscriber() throws JMSException { 
    TopicConnection topicCon = JmsTopicConnectionFactory.createTopicConnection(); 
    TopicSession topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
    Topic topic = topicSes.createTopic("#"); 
    TopicSubscriber topicSub = topicSes.createSubscriber(topic); 
    topicSub.setMessageListener(this); 
    topicCon.start(); 
} 

    @Override 
    public void onMessage(Message arg0) { 
            BytesMessage bm = (BytesMessage) arg0; 
            try { 
                    String destination = bm.getJMSDestination().toString(); 
            } catch (JMSException e) { 
                    e.printStackTrace(); 
            }  
    } 
} 
3
I don't understand the question. When you subscribe to a topic, you specify a queue into which MQ will put the messages published to that topic. When that queue is empty, you have read all the messages published to the topic. I don't see how subscribing to a part of the topic tree is different.Attila Repasi
Lets says the actual topic tree looks like this; /BusRoute/1 /BusRoute/2, and I subscribe to /BusRoute/#. I will get an immediate response for 1 and 2, but how would I know that that is all there is?lafual

3 Answers

0
votes

When there are no more publications, the consumer.receive() method will throw an exception with MQRC 2033 reason code. You can use the reason code to confirm you have read all publications.

On the received message call getIntProperty(JmsConstants.JMS_IBM_RETAIN); to know if the publication is retained or not.

0
votes

Here is my solution using PCF. Please excuse any syntax errors.

public class TopicChecker implements MessageListener { 

    PCFMessageAgent           agent; 
    PCFMessage                pcfm; 
    Set<String>               allTopics; 
    TopicConnection           topicCon; 
    TopicSession              topicSes; 
    Topic                     topic; 
    TopicSubscriber           topicSub; 
    JmsTopicConnectionFactory jcf; 
    int                       total; 

    public TopicChecker() throws Exception { 
        allTopics = new HashSet<>(); 
        MQEnvironment.hostname = "myMqServer"; 
        MQEnvironment.channel = "MY.CHANNEL"; 
        MQEnvironment.port = 1515; 
        MQQueueManager m = new MQQueueManager("MY.QUEUE.MANAGER"); 
        agent = new PCFMessageAgent(m); 
        pcfm = new PCFMessage(MQConstants.MQCMD_INQUIRE_TOPIC_STATUS); 
        pcfm.addParameter(MQConstants.MQCA_TOPIC_STRING, "MyRoot/#"); 
        PCFMessage[] responses = agent.send(pcfm); 

        for (PCFMessage response: responses) { 
            /* We only publish to leaf nodes, so ignore branches. */
            if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) { 
            allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING)); 
            } 
        } 

        total = allTopics.size(); 
        agent.disconnect(); 

        jcf = ...
        topicCon = jcf.createTopicConnection(); 
        topicSes = topicCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 
        topic = topicSes.createTopic("MyRoot/#"); 
        topicSub = topicSes.createSubscriber(topic); 
        topicSub.setMessageListener(this); 
        topicCon.start(); 
    } 

    @Override 
    public void onMessage(Message m) { 
     try { 
        String topicString = m.getJMSDestination().toString().replaceAll("topic://", ""); 
        allTopics.remove(topicString); 
        System.out.println("Read : " + topicString + " " + allTopics.size() + " of " + total + " remaining."); 
        if (allTopics.size() == 0) System.out.println("---------------------DONE----------------------"); 
        } catch (JMSException e) { 
               e.printStackTrace(); 
        } 
   }


   public static void main(String[] args) throws Exception { 
       TopicChecker tc = new TopicChecker(); 
       while (tc.allTopics.size() != 0); 
   } 
} 
0
votes

if (response.getIntParameterValue(MQConstants.MQIACF_RETAINED_PUBLICATION) > 0) { allTopics.add(response.getStringParameterValue(MQConstants.MQCA_TOPIC_STRING)); }

From the status response above, to read the topic name, you should actually use "MQCA_ADMIN_TOPIC_NAME".