2
votes

According to documentation EventProcessorClient is recommended way to consume eventhub streams:

EventProcessorClient should be used for reading events from all partitions in a production scenario, as it offers a much more robust experience with higher throughput.

EventProcessorClient allows to update consumer's offset only inside message handler i.e. for each message, something like:

var eventProcessorClient = new EventProcessorClient(...);
eventProcessorClient.ProcessEventAsync += ProcessEventHandler;
eventProcessorClient.StartProcessingAsync(stoppingToken);

...

static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
    Console.WriteLine("Received event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
}

At the same time in documentation for eventhub:

Updating after each successfully processed event can have performance and cost implications as it triggers a write operation to the underlying checkpoint store. Also, checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub. The idea behind Event Hubs is that you get "at least once" delivery at great scale.

How to update partition checkpoint in efficient way? Should I use kafka client instead of Azure.Messaging.EventHubs?

2

2 Answers

2
votes

Checkpointing after each message is not a good idea. I'd recommend you checkpoint after each batch of messages that you have processed, for instance after each 50 messages (I think the number depends on how many messages you're processing etc...)

Next to that, you should think on how you deal with messages that are processed twice. It could happend for instance that your EventProcessorClient crashes, which means that he'll restart reading from EventHub on the last checkpointed position. In that case, you'll read some messages that you might have processed already. So in that case, you'll need to think of a mechanism that can handle that.

2
votes

Maybe a timer with diff checking is better than number count.

With number count, if there isn't any new event for a long time, you will never update the checkpoint.

public class EventListener
    {
        private readonly Timer updateCheckpointTimer;
        private ProcessEventArgs? lastEventArgs;

        public EventListener(...)
        {
            ...
            this.updateCheckpointTimer = new Timer(this.UpdateCheckpoint, null, TimeSpan.Zero, TimeSpan.FromSeconds(7));
        }

        public async Task ProcessEventAsync(ProcessEventArgs args)
        {
            // business logic
            ...

            this.lastEventArgs = args;
        }

        private async void UpdateCheckpoint(object state)
        {
            if (this.lastEventArgs is ProcessEventArgs args)
            {
                await args.UpdateCheckpointAsync(args.CancellationToken);
                this.lastEventArgs = null;
            }
        }
    }
}