7
votes

It's clear that system based on these patterns is easily scalable. But I would like to ask you, how exactly? I have few questions regarding scalability:

  1. How to scale aggregates? If I will create multiple instances of aggregate A, how to sync them? If one of the instances process the command and create an event, this event should be propagated to every instance of that agregate?
  2. Shouldn't be there some business logic present which instance of the agregate to request? So if I am issuing multiple commands which applies to aggregate A (ORDERS) and applies to one specific order, it make sense to deliver it to the same instance. Or?

In this article: https://initiate.andela.com/event-sourcing-and-cqrs-a-look-at-kafka-e0c1b90d17d8, they are using Kafka with a partitioning. So the user management service - aggregate is scaled but is subscribed only to specific partition of the topic, which contains all events of a particular user.

Thanks!

3

3 Answers

5
votes

How to scale aggregates?

  • choose aggregates carefully, make sure your commands spread reasonably among many aggregates. You don't want to have an aggregate that likely to receive high number of command from concurrent users.

  • Serialize commands sent to aggregate instance. This can be done with aggregate repository and command bus/queue. But for me, the simplest way is to make optimistic locking with aggregate versioning as described in this post by Michiel Rook

which instance of the agregate to request?

In our reSolve framework we are creating instance of aggregate on every command and don't keep it between requests. This works surprisingly fast - it is faster to fetch 100 events and reduce them to aggregate state, than to find a right aggregate instance in a cluster.

This approach is scalable, lets you go serverless - one lambda invocation per command and no shared state in between. Those rare cases when aggregate has too many events are solved by snapshots.

4
votes

How to scale aggregates?

The Aggregate instances are represented by their stream of events. Every Aggregate instance has its own stream of events. Events from one Aggregate instance are NOT used by other Aggregate instances. For example, if Order Aggregate with ID=1 creates an OrderWasCreated event with ID=1001, that Event will NEVER be used to rehydrate other Order Aggregate instances (with ID=2,3,4...).

That being said, you scale the Aggregates horizontally by creating shards on the Event store based on the Aggregate ID.

If I will create multiple instances of aggregate A, how to sync them? If one of the instances process the command and create an event, this event should be propagated to every instance of that agregate?

You don't. Each Aggregate instance is completely separated from other instances.

In order to be able to scale horizontally the processing of commands, it is recommended to load each time an Aggregate instance from the Event store, by replaying all its previously generated events. There is one optimization that you can do to boost performance: Aggregate snapshots, but it is recommended to do it only if it's really needed. This answer could help.

Shouldn't be there some business logic present which instance of the agregate to request? So if I am issuing multiple commands which applies to aggregate A (ORDERS) and applies to one specific order, it make sense to deliver it to the same instance. Or?

You assume that the Aggregate instances are running continuously on some servers' RAM. You could do that but such an architecture is very complex. For example, what happens when one of the servers goes down and it must be replaced by other? It's hard to determine what instances where living there and to restart them. Instead, you could have many stateless servers that could handle commands for any of the aggregate instances. When a command arrives, you identity the Aggregate ID, you load it from the Event store by replaying all its previous events and then it can execute the command. After the command is executed and the new events are persisted to the Event store, you can discard the Aggregate instance. The next command that arrives for the same Aggregate instance could be handled by any other stateless server. So, scalability is dictated only by the scalability of the Event store itself.

0
votes

How to scale aggregates?

Each piece of information in the system has a single logical authority. Multiple authorities for a single piece of data gets you contention. You scale the writes by creating smaller non overlapping boundaries -- each authority has a smaller area of responsibility

To borrow from your example, an example of smaller responsibilities would
be to shift from one aggregate for all ORDERS to one aggregate for _each_
ORDER.

It's analogous to the difference between having a key value store with
all ORDERS stored in a document under one key, vs each ORDER being stored
using its own key.

Reads are safe, you can scale them out with multiple copies. Those copies are only eventually consistent, however. This means that if you ask "what is the bid price of FCOJ now?" you may get different answers from each copy. Alternatively, if you ask "what was the bid price of FCOJ at 10:09:02?" then each copy will either give you a single answer or say "I don't know yet".

But if the granularity is already one command per aggregate, what is not very often possible in my opinion, and you have really many concurrent accesses, how to solve it? How to spread the load and stay without the conflict as much as possible?

Rough sketch - each aggregate it stored via a key that can be computed from the contents of the command message. Update to the aggregate is achieved by a compare-and-swap operation using that key.

Acquire a message
Compute the storage key
Load a versioned representation from storage
Compute a new versioned representation
Store.compare and swap the new representation for the old

To provide additional traffic throughput, you add more stateless compute.

To provide storage throughput, you distribute the keys across more storage appliances.

A routing layer can be used to group messages together - the routers uses the same storage key calculation as before, but uses that to choose where in the compute farm to forward the message. The compute can then check each batch of messages it receives for duplicate keys, and process those messages together (trading some extra compute to reduce the number of compare and swaps).

Sane message protocols are important; see Marc de Graauw's Nobody Needs Reliable Messaging.