2
votes
  1. I'm trying to implement MQ publish/subscribe in a C#/.Net application.

  2. I've following the instructions in this tutorial:

    https://tekslate.com/publish-subscribe-in-websphere-mq-series/

    • a) Queue manager: QM
    • b) TCP Listener running, port: 1420
    • c) Topic name: NEWS.SPORTS.CRICKET
    • d) Subscription name also: NEWS.SPORTS.CRICKET
    • e) Topic string: NEW/SPORTS/CRICKET
    • f) Destination queue: SportsQ
  3. I'm successfully able to to a "test publish" in MQ Explorer. I see "message count = 1" in the subscription. I see "queue depth = 1" in SportsQ

  4. I'm able to connect to QM, I'm able to access the topic ... but it just hangs when I do a "topic.Get(message")

Q: Why is MQ "Get()" hanging?????

Code:

using IBM.WMQ;
using System;
using System.Collections;

namespace HelloSubscribe
{
    class Program
    {
        static void Main(string[] args)
        {
            string qmName = "QM";
            string hostName = "localhost";
            string strPort = "1420";
            string channelName = "SYSTEM.DEF.SVRCONN";
            string transport = MQC.TRANSPORT_MQSERIES_CLIENT;

            Hashtable connectionProperties = new Hashtable();
            connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
            connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
            connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);

            MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);

            string topicObject = null;
            string topicString = "NEWS/SPORTS/CRICKET";
            string subscriptionName = "NEWS.SPORTS.CRICKET";
            string topicName = "NEWS.SPORTS.CRICKET";
            int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
            MQTopic destForGet = mqQueueManager.AccessTopic(topicString, null, openOptionsForGet, null, subscriptionName);

            MQMessage messageForGet = new MQMessage();
            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.WaitInterval = 1000;  // wait 60 seconds
            destForGet.Get(messageForGet, gmo);
            string msg = messageForGet.ReadLine();

            destForGet.Close();
            mqQueueManager.Disconnect();
            mqQueueManager.Close();
        }
    }
}

I'm running WebSphere MQ 7.5, using the installed version of amqdnet.dll, and Visual Studio 2015.

2
what criteria are you using to determine the end of message? You have a readline so did you add a return at end of message that was sent? readline method wil block until a return is received.jdweng
I just did a "test publish" in MQ explorer and typed in some text. Based on my experience with reading and writing point-to-point messages, ReadLine() should be appropriate. The problem is that I never get that far - it hangs in "Get()".paulsm4
Yes, Yes, Yes!!!! Get will also block until the end of message occurs. You do not have the Get method posted so I do not know how Get determines end.jdweng
Try something a little simpler to make sure all the libraries are installed properly. Try code on following webpage : c-sharpcorner.com/article/….jdweng
Looks like I helped somebody with similar issue in 2015 : social.msdn.microsoft.com/Forums/windowsdesktop/en-US/…jdweng

2 Answers

3
votes

MQ Administrative Subscription

The tutorial you reference has you setup MQ Administrative subscriptions to subscribe topic strings to specific MQ queues, in this way any message published to a topic that matches the topic string of the subscription will be put to the queue you specify on the MQ Administrative subscription for example SportsQ, to read the message from the queue you would treat it as a queue not as a topic and access it with AccessQueue method for example:

        string queueName = "SportsQ";
        int openOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
        MQQueue destForGet = mqQueueManager.AccessQueue(queueName, openOptionsForGet, null, null, null);

Managed durable subscription

What you are demonstrating in the code you provided is called a Managed durable subscription, in the background MQ will create a queue with a prefix like SYSTEM.MANAGED.DURABLE.<uniq 16 character HEX value> and subscribe this to the topic string. If you disconnect and then resume with the same "subscription name" then you will be connected to the same Managed queue and receive any messages that were published while you were not connected.

Note that outside of something called a Retained publications in which MQ will save the most recent publication for future subscribers, you do not receive any messages published to a topic string before you subscribe, this is why you would not see the message you published via MQ Explorer prior to creating the Managed durable subscription.

Based on what you provided you should be able to just publish a new message to the topic string and your application would receive it.


Unmanaged durable subscription

Another option is to delete the MQ Administrative subscription you created and alter your program to make a unmanaged subscription where you provide a queue destination, for example:

        string queueName = "SportsQ";
        int QueueOpenOptionsForGet = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_FAIL_IF_QUIESCING;
        int TopicOpenOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
        MQQueue destQueue = mqQueueManager.AccessQueue(queueName, QueueOpenOptionsForGet);
        MQTopic destForGet = mqQueueManager.AccessTopic(destQueue, topicString, null, openOptionsForGet, null, subscriptionName);

This results in the a API subscription being created linking the topic string to the queue you provide as the destination.

Just like with the MQ Administrative subscription, you could then treat this going forward as just a queue to get from or you can RESUME the subscription.


Resuming a Unmanaged durable subscription without the need to know the queue name

You need to still call the AccessTopic overload that has destination as the first parameter but you pass in NULL instead of a MQDestination, the program will resume the already created unmanaged subscription without the need to know the queue name. I was able to compile and test the following is working:

using IBM.WMQ;
using System;
using System.Collections;


namespace HelloSubscribe
{
    class Program
    {
        static void Main(string[] args)
        {
            string qmName = "QM";
            string hostName = "localhost";
            string strPort = "1420";
            string channelName = "SYSTEM.DEF.SVRCONN";
            string transport = MQC.TRANSPORT_MQSERIES_CLIENT;

            Hashtable connectionProperties = new Hashtable();
            connectionProperties.Add(MQC.HOST_NAME_PROPERTY, hostName);
            connectionProperties.Add(MQC.PORT_PROPERTY, strPort);
            connectionProperties.Add(MQC.CHANNEL_PROPERTY, channelName);

            MQQueueManager mqQueueManager = new MQQueueManager(qmName, connectionProperties);

            string topicString = "NEWS/SPORTS/CRICKET";
            string subscriptionName = "NEWS.SPORTS.CRICKET";
            int openOptionsForGet = MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
            MQTopic destForGet = mqQueueManager.AccessTopic(null, topicString, null, openOptionsForGet, null, subscriptionName);

            MQMessage messageForGet = new MQMessage();
            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.WaitInterval = 60000;  // wait 60 seconds
            destForGet.Get(messageForGet, gmo);
            string msg = messageForGet.ReadLine();
            System.Console.WriteLine("Received message data : " + msg);

            destForGet.Close();
            mqQueueManager.Disconnect();
            mqQueueManager.Close();
        }
    }
}

Also note that you do not even need the topicString, this can be null as well, the subscriptionName is all that is required to resume it:

            MQTopic destForGet = mqQueueManager.AccessTopic(null, null, null, openOptionsForGet, null, subscriptionName);

Some other notes, the examples in the tutorial provided mixed case queue names, this is not recommended because in many places IBM MQ will fold things to UPPER CASE if you are not careful. The recommended best practice is to use UPPER CASE for IBM MQ object names (queues, etc).

WaitInterval is in milliseconds so setting it to 1000 will wait 1 second not 60 seconds.

2
votes

I got it working ... by specifying a destination queue:

    ...
    string topicString = "NEWS/SPORTS/CRICKET";
    string subscriptionName = "NEWS.SPORTS.CRICKET";
    string topicName = "NEWS.SPORTS.CRICKET";
    MQDestination unmanagedDest = mqQueueManager.AccessQueue("SportsQ", MQC.MQOO_INPUT_EXCLUSIVE | MQC.MQOO_FAIL_IF_QUIESCING);

    int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, openOptionsForGet, null, subscriptionName);  // Hangs in "Get()"
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, null, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);  // MQRC_SUB_NAME_ERROR
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);  // MQRC_SUB_NAME_ERROR
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, topicString, openOptionsForGet, null, subscriptionName);  // Hangs in "Get()"
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", openOptionsForGet, null, subscriptionName);  // Hangs
    // MQTopic destForGet = mqQueueManager.AccessTopic(topicName, "SportsQ", MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);  //  MQRC_SUB_NAME_ERROR
    // MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet);  // MQRC_SUB_NAME_ERROR
    MQTopic destForGet = mqQueueManager.AccessTopic(unmanagedDest, topicName, null, openOptionsForGet, null, subscriptionName);  // <-- this works!

    MQMessage messageForGet = new MQMessage();
    MQGetMessageOptions gmo = new MQGetMessageOptions();
    gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
    gmo.WaitInterval = 1000;  // wait 60 seconds
    destForGet.Get(messageForGet, gmo);  // <-- No hang, if message present
    string msg = messageForGet.ReadLine();
    ...