I have the following scenario.
I have a queue that I receive messages from. When a message comes in a new thread is created to deal with it.
This thread takes the message and adds it to a collection. It then checks if the collection contains 100 items and if it does it sends them elsewhere and clears the collection.
I can't use a regular list because I get collection is modified, enumeration can't continue errors. So I need to use a thread safe collection.
My worry here though is, one thread writes to it and its the 100th item, while it's sending them off elsewhere another thread adds to the collection. Making it 101 items, the thread that triggered the 100th then clears it and I lose an item.
I can't use a concurrent bag, because it has no clear, I can't iterate the bag and remove 1 by 1 as messages might be coming in and being added quicker than it can remove and it will never end.
ConcurrentStack has a clear, but would that work in this scenario?
Some code to demonstrate what I mean, HandleMeasurementMessage happens on a new thread for each message.
private static readonly ConcurrentStack<EventHubDatum> EventHubDataBatch = new ConcurrentStack<EventHubDatum>();
private static void HandleMeasurementMessage(IMessage<MessageEnvelope> msg)
{
/* Do a bunch of stuff to msg */
EventHubDataBatch.Push(eventHubDatum);
if(EventHubDataBatch.Count == 100)
{
/* Send them off*/
EventHubDatabatch.Clear();
}
}
Wierdly, the problem with the enumeration being modified only occurs if I'm not running it via the debugger in VS2015. The program runs for an hour or so just fine. If I turn off the debugger, I get these enumeration errors which is why I'm trying to switch to a threadsafe collection. I'm just not sure which one is appropriate.
Code that calls HandleMeasurementMessage
_busSingle.Consume<MessageEnvelope>(_queueMeasurement, (msg, MessageReceivedInfo) => Task.Factory.StartNew(() =>
{
try
{
HandleMeasurementMessage(msg);
}
catch (Exception ex)
{
/* Logging stuff*/
}
}));
lock
, no need for concurrent collections here – Evk