2
votes

I'm using azure event hub's EventProcessorHost to process batch of events. For some reason I have to skipped events by not writing checkpoint when the threads count has reached the maximum, but I have to retrieve those skipped events after the threads count come down. Please see the implementation below:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        //Process Events
        foreach (var eventData in messages)
        {
            if (Process.GetCurrentProcess().Threads.Count <= 50)
                {
                    //do work
                    await context.CheckpointAsync(eventData);
                }
                else
                {
                    //do not write Checkpoint
                    break;
                }        
        }
    }

This is a simple and straightforward logic but it's not working as what I expected. Once the "break" line hit the "foreach" breaks, I expect those skipped events will show up in the next "ProcessEventsAsync", but they never come again until the worker role recycle and re-register the "EventProcessorHost".

I have stuck in this issue for a few days please someone figure out what I've missed.

Many Thanks in advance!

2

2 Answers

0
votes

No, the events are received in a sequence regardless of what you choose to do with them. So even though the foreach loop never gets executed once you break out of it, ProcessEventsAsync will be called as long as you have the lease active and events to receive.

0
votes

What you are missing is that there are two layers here, one layer which fetches the data from eventhub and the 2nd layer which process the data.

EventProcessorHost hides that from the developer, among other things like automatic partition balancing when multiple event processors are running.

So what actually happens is that the layer which fetches the data from the eventhub holds internal state which includes the last event that was fetched, therefor it doesn't matter if you do checkout or not, if the same eventprocessor is processing data from the same partition (didn't lose it's partition "ownership"), it will keep getting new data all the time.

The checkout is actually for situations where event processor "A" loses his lease("ownership") on partition "X" and event processor "B" (including it's events fetching module internally) is taking the "ownership" on partition "X", in this situation event processor "B" needs to know what was the last point in time where event processor "A" make a "until here i'v processed events safely" statement by using the "checkout" method.

After that being said, you should know that every event the event processor is receiving should be handled somehow, if you don't want to process events when your thread count is over 50, you should at least move these events to another place(can be another eventhub/queue or any other service/storage) for further handling.