7
votes

so this is a fairly broad question but have run out of ideas. We are currently running 2 worker role instances that does the following :

  • Monitors and processes IoT Hub events by spawning N threads for each batch.
  • Monitors and processes Connect/Disconnect (Operations monitoring) messages from IoT Hub
  • Does some Service bus work (topics and queues)
  • Writes to SQL, DocDB (Mongo API) and Azure table storage for logging via NLOG
  • Sends Cloud to Device message via IoT Hub

The problem we face is during high peaks our CPU obviously increases but sadly never comes back down and often will shoot up to 100% and sit there until I restart the instances to get it back down. I keep looking into the threads as I still feel it could be related to a "while(1)" type scenario even though cant see why. Lets get into the code now...

In the WorkerRole.cs:

    class WorkerRole : RoleEntryPoint
    {
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

        public override void Run()
        {
            _eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);

            //Wait for shutdown to be called, else the role will recycle
            this.runCompleteEvent.WaitOne();
        }
    }

In EventProcessor.cs: I will try leave out a lot of the juice but add what I feel might be worthy. Will add "pseudo code" where possible.

public class EventProcessor : IEventProcessor
{
  private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

  public async Task Start(string serviceId, int InstanceIndex)
  {

    //Setup Topic

    //Setup Queue

    //Setup EventProcessorHost for receiving events and operations monitoring and start listening

    //Receiving cloud to device feedback from service
    ReceiveFeedbackAsync();

    runCompleteEvent.WaitOne();
  }

  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
  {
        if (messages.Count() > 0)
        {
            if (!_cancellationSource.IsCancellationRequested)
            {
                await ProcessEventsBulk(context, messages);
            }
        }

        if (messages.Count() > 0)
        {
            await context.CheckpointAsync();               
        }
   }

  async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
        {
            List<Task> TaskList = new List<Task>();
            foreach (EventData message in messages)
            {
                var LastTask = Task.Run(() => GoBoy(context, message));
                TaskList.Add(LastTask);
            }
            await Task.WhenAll(TaskList);
        }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        try
        {
            using (var db = new AppDbContext(_dbContextConnectionString))
            {
                await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
                await db.SaveChangesAsync();
            }
        }
        catch (Exception e)
        {
           //Do Some stuff...
        }
    }

  private async void ReceiveFeedbackAsync()
    {
        var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
        while (true)
        {
            try
            {
              var feedbackBatch = await feedbackReceiver.ReceiveAsync();
              if (feedbackBatch == null) continue;
              foreach (var records in feedbackBatch.Records)
              {

              }
              await feedbackReceiver.CompleteAsync(feedbackBatch);
            }
            catch (Exception)
            {
              Thread.Sleep(30000);                    
            }
         }

    }

}

If there is anything extra anyone needs please dont hesitate to ask. I really really appreciate any help.

Here shows the CPU drop once I restarted the workers enter image description here

Microsoft support assisted with asking me to do some PerfViews and some ProcDumps. The outcome was that we should look into the thread calling our hub "https://abcxyz.azure-devices.net:443/$iothub/websocket". This is why I decided to add the ReceiveFeedbackAsync() method as I know that relies on being permanently connected to our hub to gather feedback.

From what I can see we are registering to our EVPH correctly but let me know if anyone would like to view that code as well.

1
If your feedbackReceiver keeps returning null due to some condition, you have a perfect 'while(true);' loop.Ton Plooij
@TonPlooij thanks for the reply, I did consider this except that its the recommended solution. docs.microsoft.com/en-us/azure/iot-hub/…. Look under Receive delivery feedback section.David
Some ideas: checkpoint less, like every 1000 messages or so, or based on a timer interval of, say one minute. Get rid of the ORM and use plain ado.net. Depending on the amount of messages received this might help. Especially the ORM can hurt performance with its conversions to an object.Peter Bons
@PeterBons Thanks for the suggestion!! I have changed it now to checkpoint after a minute on each partitionId, lets see how it goes tonight during peak hours. UPDATE : Forgot to click add comment but wow CPU is now MUCH lower during off peak so lets see. So far very happy with the results. Will keep everyone up to date.David
@David Within the same loop that Peter mentioned, you are catching the Exception, sleeping and then retrying. This could also potentially cause a while(true) loop, because each bad message will throw an exception and then thread sleep for 30 seconds before retrying. I think you should set a retry counter and if it is exceeded break out of the loop within the catch. I think along with Peter's suggestion your issue should be solved.ObiEff

1 Answers

0
votes

Did you step through the code and ensure you aren't creating an infinite loop condition that's not throwing any exceptions so your Thead.Sleep to execute. Since you're expecting to Sleep in your code, it's best to avoid using an Exception to trigger that. Perhaps code it to Sleep after each batch of feedback being processed. Exception are for error handling and exceptional circumstances, not to help control logic flow.