I have an application which is installed in 3 different servers.This application subscribes to a single Event hub. This event hub has 8 partitions. So when I start my application in all the 3 machines, all partitions are initialized randomly on all the 3 machines.
Say it is like this:
VM1 : Partition 0,1,2
VM2 : Partition 3,4
VM3 : Partition 5,6,7
All these partitions are receiving messages continuously. These messages needs to be processed one after the other. Now my requirement is , with in a machine/server, I want to receive only one message at a time (no matter how many partitions are initialized). Also VM1, VM2, VM3 can run in parallel.
A scenario would be, in one machine, say VM1, I have received a message through Partition 0. That message is being processed now which typically takes say 15 mins. With in these 15 mins, I do not want either Partition 1 or 2 to receive any new messages until the earlier one is finished. Once the previous message processing is done, then either of the 3 partitions is ready for new message. Once anyone partition receives another message, other partitions should not receive any messages.
The code I'm using is something like this :
public class SimpleEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
DoSomethingWithMessage(); // typically takes 15-20 mins to finish this method.
}
return context.CheckpointAsync();
}
}
Is this possible?
PS: I have to use event hubs and have no other option.