3
votes

I have the following scenario.

I have a queue that I receive messages from. When a message comes in a new thread is created to deal with it.

This thread takes the message and adds it to a collection. It then checks if the collection contains 100 items and if it does it sends them elsewhere and clears the collection.

I can't use a regular list because I get collection is modified, enumeration can't continue errors. So I need to use a thread safe collection.

My worry here though is, one thread writes to it and its the 100th item, while it's sending them off elsewhere another thread adds to the collection. Making it 101 items, the thread that triggered the 100th then clears it and I lose an item.

I can't use a concurrent bag, because it has no clear, I can't iterate the bag and remove 1 by 1 as messages might be coming in and being added quicker than it can remove and it will never end.

ConcurrentStack has a clear, but would that work in this scenario?

Some code to demonstrate what I mean, HandleMeasurementMessage happens on a new thread for each message.

private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

   EventHubDataBatch.Push(eventHubDatum);

   if(EventHubDataBatch.Count == 100)
   {
      /* Send them off*/
      EventHubDatabatch.Clear();
   }
}

Wierdly, the problem with the enumeration being modified only occurs if I'm not running it via the debugger in VS2015. The program runs for an hour or so just fine. If I turn off the debugger, I get these enumeration errors which is why I'm trying to switch to a threadsafe collection. I'm just not sure which one is appropriate.

Code that calls HandleMeasurementMessage

_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
            {
                try
                {
                    HandleMeasurementMessage(msg);
                }
                catch (Exception ex)
                {
                    /* Logging stuff*/
                }
            }));
2
There are plenty of synchronization mechanisms available...have you researched thread synchronization in .NET?rory.ap
@rory.ap, yes, I have been reading the .NET docs but it's a bit confusing. Hence the question here.Stuart
Your question needs to talk about that then. What research have you done (include links)? What topics has it revealed, and why hasn't that information helped your current problem, or what don't you understand about it?rory.ap
"When a message comes in a new thread is created to deal with it." Do I read this as it says? If you receive 100 messages, you'll have 100 threads? Where do the messages come from?Jeroen van Langen
Just use a simple lock, no need for concurrent collections hereEvk

2 Answers

3
votes

I would just use simple lock like this:

private static readonly List<EventHubDatum> EventHubDataBatch = new List<EventHubDatum>();        
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    EventHubDatum[] toSend = null;
    lock (EventHubDataBatch) {
        EventHubDataBatch.Add(eventHubDatum);

        if (EventHubDataBatch.Count == 100) {
            // copy to local
            toSend = EventHubDataBatch.ToArray();
            EventHubDataBatch.Clear();
        }
    }

    if (toSend != null) {
        /* Send them off*/
    }
}

Locking here is very brief so it should not affect perfomance in your case in any noticeable way. Note that if there are 100 items - we copy them to local array and clear source list to not hold the lock for the duration of "send them off" operation, which potentially might take long time.

0
votes

Use a synchronisation object like AutoResetEvent to only allow one thread to access the collection at a time.

Example usage:

static AutoResetEvent MeasureMessageEvent = new AutoResetEvent(true);

private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
    /* Do a bunch of stuff to msg */

    // Wait for exclusive access
    MeasureMessageEvent.WaitOne();

    EventHubDataBatch.Push(eventHubDatum);

    if(EventHubDataBatch.Count == 100)
    {
       /* Send them off*/
       EventHubDatabatch.Clear();
    }

    // Release exclusive access
    MeasureMessageEvent.Set();
}