2
votes

I have a WCF service that creates a private message in MSMQ. The messages are being created fine, and I can see the messages that my client application is creating.

I also have a MSMQ listener service that seems to run fine. When I start the service up, it appears to successfully "process" the messages in the queue and they are removed. However, the implementation of my listener service doesn't seem to get executed.

I'm fairly new to MSMQ, and I have no idea why the messages are being removed from the queue, and why the code in my listener method is not getting executed.

Below are my service classes...

[ServiceContract(Namespace = ServiceConstants.NAMESPACE, Name = "IOrderService")]
public interface IOrderQueueProcessingService
{

    [OperationContract(IsOneWay = true, Action = "*")]
    void ProcessOrderQueue(MsmqMessage<string> message);

}

public abstract class OrderQueueProcessingServiceBase : ServiceBase, IOrderQueueProcessingService
{
    #region CONSTRUCTORS

    protected OrderQueueProcessingServiceBase() { }

    protected OrderQueueProcessingServiceBase(List<EventRecord> existingList) : base(existingList) { }

    #endregion //CONSTRUCTORS

    #region IOrderQueueProcessingService Members


    [OperationBehavior(TransactionScopeRequired = false, TransactionAutoComplete = true)]
    public virtual void ProcessOrderQueue(MsmqMessage<string> message)
    {
        throw new NotImplementedException();
    }

    #endregion
}

public class OrderQueueProcessingService : OrderQueueProcessingServiceBase
{
    #region Constructors


    public OrderQueueProcessingService() {}

    public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) { }


    #endregion

    /// <summary>
    ///     Processes any Orders in the Orders Queue
    /// </summary>
    /// <param name="message"></param>
    public override void ProcessOrderQueue(MsmqMessage<string> message)
    {

        var q = new MessageQueue(@".\private$\msmqdemo/submitorderservice.svc");
        q.Send("hey");
        /*
        using (new Tracer("OrderQueueProcessingService"))
        {
            // add data context to work with.
            using (var unitOfWork = new TLFDataContext())
            {

                var newOrderLines = new List<OrderLineDataContract>
                                {
                                    new OrderLineDataContract
                                        {
                                            C = "test",
                                            IC = "msw",
                                            Qty = 1,
                                            T = "volume" ,
                                            ED = DateTime.UtcNow.AddDays(5)
                                        }
                                };

                var newOrder = new OrderDataContract
                {
                    LIs = newOrderLines.AsEnumerable(),
                    PId = 9323,
                    POId = 8686,
                    S = "new"

                };
                var orderService = new OrderService();
                var createdOrder = orderService.CreateOrder(null, null, newOrder);
                //unitOfWork.SubmitUnitOfWork();

                //return null;
            }
        }*/


    }

}

I commented out the code that I am eventually trying to execute, and replaced it with a simple MSMQ message send, for testing. This seems like it should work fine. Any help would be greatly appreciated.

Config settings below...

<service name="ServiceImplementation.OrderQueueProcessingService">
    <host>
      <baseAddresses>
        <add baseAddress="https://localhost:9000/OrderQueueProcessingService.svc" />
      </baseAddresses>
    </host>
    <endpoint address="net.msmq://localhost/private/testingqueue/OrderQueueProcessingService.svc" binding="netMsmqBinding" bindingConfiguration="MsmqBindingNonTransactionalNoSecurity" contract="IOrderQueueProcessingService" />
  </service>
1
What is the .svclog telling you? Have you hit the endpoint through your browser? Does that work? Have you hit the endpoint using the WCF Test Client? Does that work?Didaxis
Also, if you don't have a .svclog or any other type of logging in place, AND you've set things up to be non-transactional, you are very likely the victim of "silent errors." how do you know if your service is throwing exceptions or not? Hint: you need to get some visibility into your service!Didaxis
Agree, with non-transactional queues the messages are just gone. You could also enable journaling: put deadLetterQueue="System" into your binding configurationtom redfern
Thank you for your responses. I do not have .svclog's implemented. One way that I did test it was by adding an EventLog in my service constructor. I'm using a self host console app to test locally, and the service is instantiated, and opened without issues. If I leave it running, and add messages to the queue, they are immediately removed from the queue, and put into the Journal messages. I also, added an EventLog in my ProcessOrderQueue method, and that is not getting hit. So my operation (method) never gets called. I can hit the service using a browser. Will try testing with a Test Client.Michael Leanos
BTW, my EventLog in the constructor does in fact create the log.Michael Leanos

1 Answers

0
votes

I was able to figure out my issue. I didn't have any event handling for the QueueOnPeekComplted event. I added that to an Initialize method, and was able to successfully process my messages. I also added handling of messages that could not be processed. Below is my new implementation of my OrderQueueProcessingService.

public class OrderQueueProcessingService : OrderQueueProcessingServiceBase, IDisposable
{
    #region Constructors

    public OrderQueueProcessingService()
    {
        Initialize(ConfigurationManager.AppSettings["OrderQueueProcessingQueueName"]);
    }

    public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) {}

    #endregion

    #region Properties

    private MessageQueue Queue { get; set; }

    #endregion

    #region IDisposable Members

    public new void Dispose()
    {
        if (Queue == null) return;

        //unsubscribe and dispose
        Queue.PeekCompleted -= QueueOnPeekCompleted;
        Queue.Dispose();
    }

    #endregion

    private void Initialize(string queueName)
    {
        Queue = new MessageQueue(queueName, false, true, QueueAccessMode.Receive)
                    {
                            Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
                    };

        //setup events and start.
        Queue.PeekCompleted += QueueOnPeekCompleted;
        Queue.BeginPeek();
    }

    private static void MoveMessageToDeadLetter(IDisposable message)
    {
        var q = new MessageQueue(ConfigurationManager.AppSettings["OrderProcessingDLQ"], QueueAccessMode.Send)
                    {
                            Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
                    };
        q.Send(message, MessageQueueTransactionType.Single);
        q.Dispose();
    }

    /// <summary>
    ///     Processes the specified Order message
    /// </summary>
    /// <param name="orderMessage"></param>
    public override void ProcessOrderQueue(OrderQueueDataContract orderMessage)
    {
        using (var unitOfWork = new MyDataContext())
        {
            switch (orderMessage.M.ToLower())
            {
                case "create":
                    DataAccessLayer.CreateOrder(unitOfWork, orderMessage.O.TranslateToBe());
                    break;
                default:
                    break;
            }
        }
    }

    private void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs peekCompletedEventArgs)
    {
        var asyncQueue = (MessageQueue) sender;

        using (var transaction = new MessageQueueTransaction())
        {
            transaction.Begin();
            try
            {
                using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
                {
                    if (message != null) ProcessOrderQueue((OrderQueueDataContract) message.Body);
                }
            }
            catch (InvalidOperationException ex)
            {
                transaction.Abort();
            }
            catch (Exception ex)
            {
                transaction.Abort();
            }

            if (transaction.Status != MessageQueueTransactionStatus.Aborted) transaction.Commit();
            else
            {
                using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
                {
                    if (message != null)
                    {
                        MoveMessageToDeadLetter(message);
                        message.Dispose();
                    }
                }

                EventLog.WriteEntry("OrderQueueProcessingService", "Could not process message: " + peekCompletedEventArgs.Message.Id);
            }
            transaction.Dispose();
        }

        asyncQueue.EndPeek(peekCompletedEventArgs.AsyncResult);
        asyncQueue.BeginPeek();
    }
}

Does anyone see any issues with this implementation? I had to fiddle with it quite a bit, but it has passed unit testing, and process testing.

One thing that I noticed is that when my service first starts up, it processes all the messages in the queue; which could potentially hold up the starting of my other services. This may only be an issue when I'm starting them in my console app while testing. To be safe I start this service last.