1
votes

In Event Sourcing, you store all individual Domain Events that have happened for one Aggregate instance, known as Event Stream. Along with Event Stream you also store a Stream Version.

Should the version be related with each Domain Event, or it should be related with transactional changes (aka commands)?


Example:

Our current state of Event Store is:

aggregate_id | version | event
-------------|---------|------
1            | 1       | E1
1            | 2       | E2

A new command is executed in aggregate 1. This command produces two new events E3 and E4.

Approach 1:

aggregate_id | version | event
-------------|---------|------
1            | 1       | E1
1            | 2       | E2
1            | 3       | E3
1            | 4       | E4

With this approach optimistic concurrency can be done by storage mechanism using unique index but replaying the events until version 3 could leave the aggregate/system in a inconsistent state.

Approach 2:

aggregate_id | version | event
-------------|---------|-----
1            | 1       | E1
1            | 2       | E2
1            | 3       | E3
1            | 3       | E4

Replaying the events until version 3 leave the aggregate/system in a consistent state.

Thanks!

4
I still researching pros/cons of two approaches. Also checking which approach is being using in IDDD and other DDD books. - martinezdelariva

4 Answers

3
votes

Short answer: #1.

The write of events E3 and E4 should be part of the same transaction.

Notice that the two approaches don't really differ in the case you are concerned about. If your read in the first case can miss E4, then so can your read in the second case. In the use case where you are loading the aggregate to do a write; loading the first three events will tell you that the next version should be #4.

In the case of approach #1, attempting to write version 4 produces a unique constraint conflict; the command handler won't be able to tell whether the problem was a bad load of the data, or simply an optimistic concurrency failure, but in either case the result is no write, and the book of record is still in a consistent state.

In the case of approach #2, attempting to write version 4 doesn't conflict with anything. The write succeeds, and now you have E5 that is not consistent with E4. Bleah.

For references on schemas for event stores, you might consider reviewing:

My preferred schema, assuming that you are compelled to roll your own, separates the stream from the events.

stream_id    | sequence | event_id
-------------|----------|------
1            | 1        | E1
1            | 2        | E2

The stream gives you a filter (stream id) to identify the events you want, and an order (sequence) to ensure the events you read are in the same order as the events you write. But beyond that, it's kind of an artificial thing, a side effect of the way that we happened to choose our aggregate boundaries. So its role should be pretty limited.

The actual event data, that lives somewhere else.

event_id | data | meta_data | ...
--------------------------------------
E1       | ...  | ... | ...
E2       | ...  | ... | ...

If you need to be able to identify the events associated with a particular command, that's part of the event meta-data, not part of the stream history (see: correlationId, causationId).

2
votes

Nothing prevents you from introducing a commit_sequence along with a version.

For instance, in NEventStore you can see that a commit has a StreamRevision (version - increasing for every event) and a CommitSequence.

0
votes

Approach 1 is what I've used and seen others use - just an incrementing number for the event, often called EventNumber

The optimistic concurrency part is just so that when you load your aggregate, you know what the latest event is. You then process the command and save any resulting events - if you see anything above the number you loaded it means you're already out of date and can act accordingly, if not you can save the events.

0
votes

In Domain Driven Design with Event Sourcing, an aggregate represents a consistency boundary, and its invariants must be true at the beginning and end of each command (or function call). You can violate an invariant in the middle of a member function, so long as it is not violated by the end of it.

What you have pointed out in your post is the very insightful. That is, if a single command (or call to a member function) on an aggregate produces multiple events, then only storing some of these events may lead to a violation of your invariant when another process reloads the aggregate from disk. When using an SQL database as an event store, there are a number of related scenarios that can produce this issue.

The first (and easiest) way to avoid this is to wrap all your event INSERT statements into a transaction, so that either all the events are persisted or none of them are (e.g., due to concurrency). That way, your "on disk" representation of the invariant is maintained. You also have to make sure that your transaction isolation level is not "READ UNCOMMITTED" (so that other processes don't see half of your commit). You also have to ensure that the database will not "interleave" event sequence numbers between processes. E.g., the database allocates sequence number "1" for an event in process A, sequence number "2" for an event in process B, then sequence number "3" for a second event in process A again. All events can be committed to the database because there are no conflicts on the concurrency constraint (of aggregate ID + event sequence number), but the sequence of events was written by two processes, and so your invariant may still be violated.

A second option is to wrap all of your events into an array that is persisted with a single INSERT statement. This essentially causes you to have a version number per commit, rather than a version number per event. To me, this is more logical, but it requires you to have a procedure to "unflatten" the event array before sending it to the various event handlers and process managers. I personally use this second mechanism in a project that stores events in raw binary format on disk. The events themselves contain only the minimal amount of information necessary for the aggregate to change state - the events do not event include the aggregate identifier. The commit on the other hand, does contain the aggregate identifier, the commit sequence number and various other metadata. This essentially separates functionality between the aggregate as a handler for uncommmitted events and event handlers for committed events. This distinction also makes sense, because if an event is a "fact" - something happened - then there is a difference between what the aggregate did and whether what the aggregate did was actually saved to disk.

On a theoretical note, a good example of your issue is that of linked lists - think about an in-memory representation only: no persistence on disk. One of the reasons you'd use a linked list over a vector or an array is that it allows efficient inserts of nodes (well, more efficient than arrays). The insertion operation typically requires the current node's "next" pointer to be set to the new node's memory address and the new node's "next" pointer to be set to the current node's previous "next" pointer. If another process were reading the same linked list in memory after the first operation completed but before the second operation completed, it would not see all the nodes in the linked list. If each "operation" is like an "event", then only seeing the first event causes the reader to see a broken linked list.