3
votes

I am using Azure Event Hub. I am planning to send events and use SendBatchAsync. I see that Event Hub has a limit of 256KB (whether sending individually or in Batch)

So, if my data is > 256KB, what is the best practice to handle this? Should we instead send message individually (which will guarantee that it is < 256KB) ?

Also, how would one split the event in 256KB chunks and send to Event Hub? I looked at Azure documentation and see they recommend to use EventHubClient.CreateBatch but I don't see enough examples. Can someone provide with some mockups or sample or steps needed in how we would end up splitting into 256KB and SendBatchAsync

Here is what I do (but dont factor for 256KB limit)

await myEventHubClient.SendBatchAsync(
    events.Select( 
        iEvent =>
            new EventData(
                Encoding.UTF8.GetBytes(JsonConvert.SerializeObject( iEvent ) ) ) ) );
3
Maybe it can be done just with event hub, someone else can probably answer that, but the classical way of dealing with this problem is to store the data separately and just send an id in the event. In azure you could for instance store your data in a bucket and just download it by id where the event is processed.alun
How big is a single message in your scenario? I am a bit confused due to the contradiction in your post. You say a single message is guaranteed to be < 256 KB but next you talk about splitting a single message in blocks of 256 KB. Hence my question.Peter Bons
@Peter Bons - I am reading from Blob/CSV file. I am writing one line of this blob/csv we get in our storage and pushing to EH. This blob size might be 8KB or it might be 256KB or even 500KB. I want my design to factor this limitation. What I am doing now is that I use SendBatchAsync these lines to EH. if I do individually (read every line) and use SendAsync I would not face this limitation as 1 line of CSV would be much lesser then 256KB. Hope this clarifies.khar

3 Answers

4
votes

Don't try and chunk a single payload across multiple messages, it becomes extremely difficult to track and reconstitute considering that events are not guaranteed to arrive in order, may end up in different partitions and could arrive some epsilon of time apart and can be removed independently from the "bus". This gets even more challenging at scale.

Option 1: If your payload is text / json and your message size is not much bigger than 256kb, consider compressing the payload. You can get an idea of size results from this online tool. If you have lots of whitespace in your payload (e.g. for JSON), you can also minify instead of compressing.

Option 2: Rather store your payload in an external store (say DocumentDB or Blob Storage).

Send a reference to your payload in the event. This keeps your events lean but your consumer needs to have the knowledge of how to retrieve the payload. An easy approach is presenting the link to the payload as a URI, obviously you may need to consider authentication and authorisation on the payload store in line with your Event Hub access policy to keep it slick.

0
votes

Here is a sample on how to use send batch without exceeding the 256KB limit. The code comes from this repo (paolosalvatori/ServiceBusExtensions)

/// <summary>
/// This class contains extensions methods for the <see cref="EventHubClient"/> class.
/// </summary>
public static class EventHubClientExtensions
{
    private const string EventDataListCannotBeNullOrEmpty = "The eventDataEnumerable parameter cannot be null or empty.";
    private const string SendPartitionedBatchFormat = "[EventHubClient.SendPartitionedBatch] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const string SendPartitionedBatchAsyncFormat = "[EventHubClient.SendPartitionedBatchAsync] Batch Sent: BatchSizeInBytes=[{0}] MessageCount=[{1}]";
    private const int MaxBathSizeInBytes = 262144;

    /// <summary>
    /// Asynchronously sends a batch of event data to the same partition.
    /// All the event data in the batch need to have the same value in the Partitionkey property.
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
    /// <param name="messages">An IEnumerable object containing event data instances.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    /// <returns>The asynchronous operation.</returns>
    public static async Task SendPartitionedBatchAsync(this EventHubClient eventHubClient, IEnumerable<EventData> messages, bool trace = false)
    {
        var eventDataList = messages as IList<EventData> ?? messages.ToList();
        if (messages == null || !eventDataList.Any())
        {
            throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
        }

        var batchList = new List<EventData>();
        long batchSize = 0;

        foreach (var eventData in eventDataList)
        {
            if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
            {
                // Send current batch
                await eventHubClient.SendBatchAsync(batchList);
                Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

                // Initialize a new batch
                batchList = new List<EventData> { eventData };
                batchSize = eventData.SerializedSizeInBytes;
            }
            else
            {
                // Add the EventData to the current batch
                batchList.Add(eventData);
                batchSize += eventData.SerializedSizeInBytes;
            }
        }
        // The final batch is sent outside of the loop
        await eventHubClient.SendBatchAsync(batchList);
        Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));
    }

    /// <summary>
    /// Asynchronously sends a batch of event data to the same partition.
    /// All the event data in the batch need to have the same value in the Partitionkey property.
    /// If the batch size is greater than the maximum batch size, 
    /// the method partitions the original batch into multiple batches, 
    /// each smaller in size than the maximum batch size.
    /// </summary>
    /// <param name="eventHubClient">The current <see cref="EventHubClient"/> object.</param>
    /// <param name="messages">An IEnumerable object containing event data instances.</param>
    /// <param name="trace">true to cause a message to be written; otherwise, false.</param>
    public static void SendPartitionedBatch(this EventHubClient eventHubClient, IEnumerable<EventData> messages,
        bool trace = false)
    {
        var eventDataList = messages as IList<EventData> ?? messages.ToList();
        if (messages == null || !eventDataList.Any())
        {
            throw new ArgumentNullException(EventDataListCannotBeNullOrEmpty);
        }

        var batchList = new List<EventData>();
        long batchSize = 0;

        foreach (var eventData in eventDataList)
        {
            if ((batchSize + eventData.SerializedSizeInBytes) > MaxBathSizeInBytes)
            {
                // Send current batch
                eventHubClient.SendBatch(batchList);
                Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchAsyncFormat, batchSize, batchList.Count));

                // Initialize a new batch
                batchList = new List<EventData> { eventData };
                batchSize = eventData.SerializedSizeInBytes;
            }
            else
            {
                // Add the EventData to the current batch
                batchList.Add(eventData);
                batchSize += eventData.SerializedSizeInBytes;
            }
        }
        // The final batch is sent outside of the loop
        eventHubClient.SendBatch(batchList);
        Trace.WriteLineIf(trace, string.Format(SendPartitionedBatchFormat, batchSize, batchList.Count));
    }
}
0
votes

Even in batching 256 KB is the limit both for Azure Event Hub and Azure Service Bus. Mimifying, compression, writing it to Blob/CosmosDB seems more feasible solution.