6
votes

So I'm trying to figure out the structure behind general use cases of a CQRS+ES architecture and one of the problems I'm having is how aggregates are represented in the event store. If we divide the events into streams, what exactly would a stream represent? In the context of a hypothetical inventory management system that tracks a collection of items, each with an ID, product code, and location, I'm having trouble visualizing the layout of the system.

From what I could gather on the internet, it could be described succinctly "one stream per aggregate." So I would have an Inventory aggregate, a single stream with ItemAdded, ItemPulled, ItemRestocked, etc. events each with serialized data containing the Item ID, quantity changed, location, etc. The aggregate root would contain a collection of InventoryItem objects (each with their respective quantity, product codes, location, etc.) That seems like it would allow for easily enforcing domain rules, but I see one major flaw to this; when applying those events to the aggregate root, you would have to first rebuild that collection of InventoryItem. Even with snapshotting, that seems be very inefficient with a large number of items.

Another method would be to have one stream per InventoryItem tracking all events pertaining to only item. Each stream is named with the ID of that item. That seems like the simpler route, but now how would you enforce domain rules like ensuring product codes are unique or you're not putting multiple items into the same location? It seems like you would now have to bring in a Read model, but isn't the whole point to keep commands and query's seperate? It just feels wrong.

So my question is 'which is correct?' Partially both? Neither? Like most things, the more I learn, the more I learn that I don't know...

2
To me if you have an issue with too many events in one aggregate, it indicates that your boundaries need to be reconsidered. - Alexey Zimarev

2 Answers

4
votes

In a typical event store, each event stream is an isolated transaction boundary. Any time you change the model you lock the stream, append new events, and release the lock. (In designs that use optimistic concurrency, the boundaries are the same, but the "locking" mechanism is slightly different).

You will almost certainly want to ensure that any aggregate is enclosed within a single stream -- sharing an aggregate between two streams is analogous to sharing an aggregate across two databases.

A single stream can be dedicated to a single aggregate, to a collection of aggregates, or even to the entire model. Aggregates that are part of the same stream can be changed in the same transaction -- huzzah! -- at the cost of some contention and a bit of extra work to do when loading an aggregate from the stream.

The most commonly discussed design assigns each logical stream to a single aggregate.

That seems like it would allow for easily enforcing domain rules, but I see one major flaw to this; when applying those events to the aggregate root, you would have to first rebuild that collection of InventoryItem. Even with snapshotting, that seems be very inefficient with a large number of items.

There are a couple of possibilities; in some models, especially those with a strong temporal component, it makes sense to model some "entities" as a time series of aggregates. For example, in a scheduling system, rather than Bobs Calendar you might instead have Bobs March Calendar, Bobs April Calendar and so on. Chopping the life cycle into smaller installments can keep the event count in check.

Another possibility is snapshots, with an additional trick to it: each snapshot is annotated with metadata that describes where in the stream the snapshot was made, and you simply read the stream forward from that point.

This, of course, depends on having an implementation of an event stream that supports random access, or an implementation of stream that allows you to read last in first out.

Keep in mind that both of these are really performance optimizations, and the first rule of optimization is... don't.

4
votes

So I'm trying to figure out the structure behind general use cases of a CQRS+ES architecture and one of the problems I'm having is how aggregates are represented in the event store

The event store in a DDD project is designed around event-sourced Aggregates:

  1. it provides the efficient loading of all events previously emitted by an Aggregate root instance (having a given, specified ID)
  2. those events must be retrieved in the order they where emitted
  3. it must not permit appending events at the same time for the same Aggregate root instance
  4. all events emitted as result of a single command must be all appended atomically; this means that they should all succeed or all fail

The 4th point could be implemented using transactions but this is not a necessity. In fact, for scalability reasons, if you can then you should choose a persistence that provides you atomicity without the use of transactions. For example, you could store the events in a MongoDB document, as MongoDB guaranties document-level atomicity.

The 3rd point can be implemented using optimistic locking, using a version column with an unique index per (version x AggregateType x AggregateId).

At the same time, there is a DDD rule regarding the Aggregates: don't mutate more than one Aggregate per transaction. This rule helps you A LOT to design a scalable system. Break it if you don't need one.

So, the solution to all these requirements is something that is called an Event-stream, that contains all the previous emitted events by an Aggregate instance.

So I would have an Inventory aggregate

The DDD has higher precedence than the Event-store. So, if you have some business rules that force you to decide that you must have a (big) Inventory aggregate, then yes, it would load ALL the previous events generated by itself. Then the InventoryItem would be a nested entity that cannot emit events by itself.

That seems like it would allow for easily enforcing domain rules, but I see one major flaw to this; when applying those events to the aggregate root, you would have to first rebuild that collection of InventoryItem. Even with snapshotting, that seems be very inefficient with a large number of items.

Yes, indeed. The simplest thing would be for us to all have a single Aggregate, with a single instance. Then the consistency would be the strongest possible. But this is not efficient so you need to better think about the real business requirements.

Another method would be to have one stream per InventoryItem tracking all events pertaining to only item. Each stream is named with the ID of that item. That seems like the simpler route, but now how would you enforce domain rules like ensuring product codes are unique or you're not putting multiple items into the same location?

There is another possibility. You should model the assigning of product codes as a Business Process. For this you could use a Saga/Process manager that would orchestrate the entire process. This Saga could use a collection with an unique index added to the product code column in order to ensure that only one product uses a given product code.

You could design the Saga to permit the allocation of an already-taken code to a product and to compensate later or to reject the invalid allocation in the first place.

It seems like you would now have to bring in a Read model, but isn't the whole point to keep commands and query's seperate? It just feels wrong.

The Saga uses indeed a private state maintained from the domain events in an eventual consistent state, just like a Read-model but this does not feel wrong for me. It may use whatever it needs in order to bring (eventually) the system as a hole to a consistent state. It complements the Aggregates, whose purpose is to not allow the building-blocks of the system to get into an invalid state.