1
votes

I am learning about DDD,CQRS and Event-sourcing and there is something I cannot figure out. Commands trigger changes in the aggregates and once the change is performed an event is fired. The event is subsequently handled by other parts of the system and preserved in the event store. However, I do not understand how replaying events would recreate the aggregate, if changes are triggered by commands.

Example: If we have a online shop. AddItemToCardCommand -> Card Aggregate adds the item to its card -> ItemAddedToCardEvent -> The event is handled by whoever. However, if the event is replayed, the aggregate would not add the item to its card.

To sum up, my question is how should I recreate aggregates based on the events in the event store? Also, any general advice on how to replay events the right way would be appreaciated.

3

3 Answers

1
votes

For simplicity, let's assume a stateless process - our service doesn't try to keep copies of things in memory, but instead reloads aggregates as needed.

The service receives AddItemToCardCommand:{card:123, ...}. We don't have the current state of card:123 in memory, so we need to create it. We do that by loading the state of card:123 from our durable store. Because we chose to use event sourced storage, the "state" we read from the durable store is a representation of the history of events previously written by the service.

Event histories have within them all of the information you need to remember, but not necessarily in a convenient "shape" - append only lists are a great data structure for writes, but not necessarily good for reads.

What this often means is that we will "replay" the events to create an in memory object which we can then use to answer questions about the events we will write next.

The same pattern is used when answering simple queries: we load the history of events from the store, transform the event history into a more convenient shape, and then use that shape to compute the answer.

In circumstances where query latency is more important than timeliness, we might design our query handler to read the convenient shapes from a cache, rather than trying to compute them fresh every time; a concurrently running background thread would be responsible to waking up periodically to compute new contents for the cache.

Using an async process to pull updates from an event stream is a common pattern; Greg Young discusses some of the advantages of that approach in his Polyglot Data talk.

1
votes

In an ideal event scenario, you would not have an already constructed aggregate structure available in your database. You repeatedly arrive at the end data structure by running through all events stored so far.

Let me illustrate with some pseudocode of adding items to cart, and then fetching the cart data.

# Create a new cart
POST /cart/new

# Store a series of events related to the cart (in database as records, similar to array items)
POST /cart/add -> CartService.AddItem(item_data) -> ItemAddedToCart

A series of events would look like:

* ItemAddedToCart
* ItemAddedToCart
* ItemAddedToCart
* ItemRemovedFromCart
* ItemAddedToCart

When its time to fetch cart data from the DB, you construct a new cart instance (or retrieve a cart instance if persisted) and replay the events on it.

cart = Cart(id=ID1)

# Fetch contents of Cart with id ID1
for each event in ID1 cart's events:
    if event is ItemAddedToCart:
        cart.add_item(event.data)
    else if event is ItemRemovedFromCart:
        cart.remove_item(event.data)

return cart

Occasionally, when there are too many events related to the cart, you may want to generate the aggregate structure then and save it in DB. Next time, you can start with the aggregate structure savepoint, and continue applying new events. This optimization helps save time and improve performance when there are too many events to process.

0
votes

What may help is to not think of the command as changing the state but rather the event as changing the state. In fact, I don't quite see how else one would go about doing so. The command handler in your aggregate would apply the invariants and, if all is OK, would immediately create the event and call some method that would apply it ([Apply|On|Do]MyEvent). The fact that you have an event after the fact does not necessarily mean other parts of your system would handle it. It is however required for event sourcing. Once you have an event you can most certainly pass that on to other parts of your system via, say, publishing on a service bus.

When you replay your events you are calling the same methods that the commands were calling to actually mutate the state of your aggregate:

public MyEvent MyCommand(string data)
{
    if (string.IsNullOrWhiteSpace(data))
    {
        throw new ArgumentException($"Argument '{nameof(data)}' may not be empty.");
    }

    return On(new MyEvent
    {
        Data = data
    });
}

private MyEvent On(MyEvent myEvent)
{
    // change the relevant state
    someState = myEvent.Data;

    return myEvent;
}

Your event sourcing infrastructure would call On(MyEvent) for MyEvent when replaying. Since you have an event it means that it was a valid state transition and can simply be applied; else something went wrong in your initial command processing and you probably have a bug.

All events in an event store would be in chronological order for an aggregate. In addition to this the events should have a global sequence number to facilitate projection processing.

You could have a generic projection that accepts any/all events and then publishes the event on a service bus for system integration. You could also place that burden on a client of the event store to have it keep track of the position itself and then read events off the store itself. You could combine these and have the client subscribe to service bus events but ensure that it executes them in the same order by keeping track of the position (global sequence number) itself and update it as the events are processed.