4
votes

I am trying to implement event sourcing/CQRS/DDD for the first time, mostly for learning purposes, where there is the idea of an event store and a message queue such as Apache Kafka, and you have events flowing from event store => Kafka Connect JDBC/Debezium CDC => Kafka.

I am wondering why there needs to be a separate event store when it sounds like its purpose can be fulfilled by Kafka itself with its main features and log compaction or configuring log retention for permanent storage. Should I store my events in a dedicated store like RDBMS to feed into Kafka or should I feed them straight into Kafka?

enter image description here

3

3 Answers

4
votes

Much of the literature on and comes from the [domain driven design] community; in its earliest form, CQRS was called DDDD... Distributed domain driven design.

One of the common patterns in domain driven design is to have a domain model ensuring the integrity of the data in your durable storage, which is to say, ensuring that there are no internal contradictions...

I am wondering why there needs to be a separate event store when it sounds like its purpose can be fulfilled by Kafka itself with its main features and log compaction or configuring log retention for permanent storage.

So if we want an event stream with no internal contradictions, how do we achieve that? One way is to ensure that only a single process has permission to modify the stream. Unfortunately, that leaves you with a single point of failure -- the process dies, and everything comes to an end.

On the other hand, if you have multiple processes updating the same stream, then you have risk of concurrent writes, and data races, and contradictions being introduced because one writer couldn't yet see what the other one did.

With an RDBMS or an Event Store, we can solve this problem by using transactions, or compare and swap semantics; and attempt to extend the stream with new events is rejected if there has been a concurrent modification.

Furthermore, because of its DDD heritage, it is common for the durable store to be divided into many very fine grained partitions (aka "aggregates"). One single shopping cart might reasonably have four streams dedicated to it.

If Kafka lacks those capabilities, then it is going to be a lousy replacement for an event store. KAFKA-2260 has been open for more than four years now, so we seem to be lacking the first. From what I've been able to discern from the Kakfa literature, it isn't happy about fine grained streams either (although its been a while since I checked, perhaps things have changed).

See also: Jesper Hammarbäck writing about this 18 months ago, and reaching similar conclusions to those expressed here.

4
votes

Kafka can be used as a DDD event store, but there are some complications if you do so due to the features it is missing.

Two key features that people use with event sourcing of aggregates are:

  1. Load an aggregate, by reading the events for just that aggregate
  2. When concurrently writing new events for an aggregate, ensure only one writer succeeds, to avoid corrupting the aggregate and breaking its invariants.

Kafka can't do either of these currently, since 1 fails since you generally need to have one stream per aggregate type (it doesn't scale to one stream per aggregate, and this wouldn't necessarily be desirable anyway), so there's no way to load just the events for one aggregate, and 2 fails since https://issues.apache.org/jira/browse/KAFKA-2260 has not been implemented.

So you have to write the system in such as way that capabilities 1 and 2 aren't needed. This can be done as follows:

  1. Rather than invoking command handlers directly, write them to streams. Have a command stream per aggregate type, sharded by aggregate id (these don't need permanent retention). This ensures that you only ever process a single command for a particular aggregate at a time.
  2. Write snapshotting code for all your aggregate types
  3. When processing a command message, do the following:
    1. Load the aggregate snapshot
    2. Validate the command against it
    3. Write the new events (or return failure)
    4. Apply the events to the aggregate
    5. Save a new aggregate snapshot, including the current stream offset for the event stream
    6. Return success to the client (via a reply message perhaps)

The only other problem is handling failures (such as the snapshotting failing). This can be handled during startup of a particular command processing partition - it simply needs to replay any events since the last snapshot succeeded, and update the corresponding snapshots before resuming command processing.

Kafka Streams appears to have the features to make this very simple - you have a KStream of commands that you transform into a KTable (containing snapshots, keyed by aggregate id) and a KStream of events (and possibly another stream containing responses). Kafka allows all this to work transactionally, so there is no risk of failing to update the snapshot. It will also handle migrating partitions to new servers, etc. (automatically loading the snapshot KTable into a local RocksDB when this happens).

1
votes

there is the idea of an event store and a message queue such as Apache Kafka, and you have events flowing from event store => Kafka Connect JDBC/Debezium CDC => Kafka

In the essence of DDD-flavoured event sourcing, there's no place for message queues as such. One of the DDD tactical patterns is the aggregate pattern, which serves as a transactional boundary. DDD doesn't care how the aggregate state is persisted, and usually, people use state-based persistence with relational or document databases. When applying events-based persistence, we need to store new events as one transaction to the event store in a way that we can retrieve those events later in order to reconstruct the aggregate state. Thus, to support DDD-style event sourcing, the store needs to be able to index events by the aggregate id and we usually refer to the concept of the event stream, where such a stream is uniquely identified by the aggregate identifier, and where all events are stored in order, so the stream represents a single aggregate.

Because we rarely can live with a database that only allows us to retrieve a single entity by its id, we need to have some place where we can project those events into, so we can have a queryable store. That is what your diagram shows on the right side, as materialised views. More often, it is called the read side and models there are called read-models. That kind of store doesn't have to keep snapshots of aggregates. Quite the opposite, read-models serve the purpose to represent the system state in a way that can be directly consumed by the UI/API and often it doesn't match with the domain model as such.

As mentioned in one of the answers here, the typical command handler flow is:

  1. Load one aggregate state by id, by reading all events for that aggregate. It already requires for the event store to support that kind of load, which Kafka cannot do.
  2. Call the domain model (aggregate root method) to perform some action.
  3. Store new events to the aggregate stream, all or none.

If you now start to write events to the store and publish them somewhere else, you get a two-phase commit issue, which is hard to solve. So, we usually prefer using products like EventStore, which has the ability to create a catch-up subscription for all written events. Kafka supports that too. It is also beneficial to have the ability to create new event indexes in the store, linking to existing events, especially if you have several systems using one store. In EventStore it can be done using internal projections, you can also do it with Kafka streams.

I would argue that indeed you don't need any messaging system between write and read sides. The write side should allow you to subscribe to the event feed, starting from any position in the event log, so you can build your read-models.

However, Kafka only works in systems that don't use the aggregate pattern, because it is essential to be able to use events, not a snapshot, as the source of truth, although it is of course discussable. I would look at the possibility to change the way how events are changing the entity state (fixing a bug, for example) and when you use events to reconstruct the entity state, you will be just fine, snapshots will stay the same and you'll need to apply correction events to fix all the snapshots.

I personally also prefer not to be tightly coupled to any infrastructure in my domain model. In fact, my domain models have zero dependencies on the infrastructure. By bringing the snapshotting logic to Kafka streams builder, I would be immediately coupled and from my point of view it is not the best solution.