0
votes

Let's assume I would like to get aggregate projection from a given point in time and I use CQRS and Event Sourcing based architecture.

Moreover, I have both read and write database. First one is powered by events and there are some aggregate projections to utilize by UI.

My question is - what is the best approach to reconstruct aggregate state from a given point in time in such an architecture, and briefly how it should look from an architectural point of view.

Note: I want to return such a projection to the client side.

3

3 Answers

1
votes

It depends on whether you expect a lot of events per aggregate.

If you do, then you can use snapshots. Save the state of an aggregate at a point in time. Then you can read this snapshot and apply all events that happened after the snapshot.

If you don't expect a lot of events, just read all events from the begging of the stream and reconstruct this aggregate. In this case snapshotting can make things more complext and even slow things down.

Here are some resources that you can check:

https://blog.jonathanoliver.com/event-sourcing-and-snapshots/ https://martinfowler.com/eaaDev/EventSourcing.html

0
votes

if you have an Event Sourcing architecture it means you already store your event in an "append-only" event store repository. From an architecture point of view you need to deserialize events and re-apply them (in memory projection) to reconstruct the state of your aggregate.

EventStore interface would look like something like this:

public interface EventStore {

    public void appendWith(EventStreamId aStartingIdentity, List<DomainEvent> anEvents);

    public void close();

    public EventStream eventStreamSince(EventStreamId anIdentity);

    public EventStream fullEventStreamFor(EventStreamId anIdentity);
}

Then in your repository you pass all the event stream to your aggregate, which will be responsible of applying the in memory projection:

public class EventStoreForumRepository
    extends EventStoreProvider
    implements ForumRepository {

    @Override
    public Forum forumOfId(Tenant aTenant, ForumId aForumId) {
        // snapshots not currently supported; always use version 1

        EventStreamId eventId = new EventStreamId(aTenant.id(), aForumId.id());

        EventStream eventStream = this.eventStore().eventStreamSince(eventId);

        Forum forum = new Forum(eventStream.events(), eventStream.version());

        return forum;
    }
}

Then the aggregate part:

public abstract class EventSourcedRootEntity {

    private List<DomainEvent> mutatingEvents;
    private int unmutatedVersion;

    public int mutatedVersion() {
        return this.unmutatedVersion() + 1;
    }

    public List<DomainEvent> mutatingEvents() {
        return this.mutatingEvents;
    }

    public int unmutatedVersion() {
        return this.unmutatedVersion;
    }

    protected EventSourcedRootEntity(List<DomainEvent> anEventStream, int aStreamVersion) {

        for (DomainEvent event : anEventStream) {
            this.mutateWhen(event);
        }

        this.setUnmutatedVersion(aStreamVersion);
    }
}

Your aggregate should extend the EventSourcedRootEntity, and the EventStore should manipulate the mutatingEvents on saving (saving only the new ones).

The samples are written in Java and taken from the repository of Vaughn Vernon, the author of Implementing Domain Driven Design (IDDD) book.

https://github.com/VaughnVernon/IDDD_Samples

0
votes

Assuming the number of events per-aggregate is small, you can respond to a request for a point in time start by just streaming the events for that aggregate up to that time into an in-memory projection. If your projection only relies on events from one aggregate type, you're done, just return the projection. If you do have too many events per aggregate for this, you can potentially store periodic snapshots for each aggregate projection (perhaps every 100 events for a particular aggregate, say).

The hitch is that you may also need other information in your projection, from other event streams. This makes it more complicated. Options include:

  • Fetch events from the other specific streams that affect your particular projection instance, if it's easy to work them out, and they aren't too big
  • Use Datomic as your event and projection store, since it supports time-based querying and projections of the sort you might need natively
  • Store the projection in a database using temporal features (SQL:2011?) or implement the time feature manually - when a "row" changes, rather than updating it, append a new one with a field for the time range it is valid from.
    • This makes querying very simple and fast
    • But requires more storage - rather than a document store or similar, you might need to use a normalized database structure for the projection with the aggregate split across multiple temporal tables to avoid having to duplicate too much for each change to a projection.