4
votes

I am trying to learn more about CQRS and Event Sourcing (Event Store).

My understanding is that a message queue/bus is not normally used in this scenario - a message bus can be used to facilitate communication between Microservices, however it is not typically used specifically for CQRS. However, the way I see it at the moment - a message bus would be very useful guaranteeing that the read model is eventually in sync hence eventual consistency e.g. when the server hosting the read model database is brought back online.

I understand that eventual consistency is often acceptable with CQRS. My question is; how does the read side know it is out of sync with the write side? For example, lets say there are 2,000,000 events created in Event Store on a typical day and 1,999,050 are also written to the read store. The remaining 950 events are not written because of a software bug somewhere or because the server hosting the read model is offline for a few secondsetc. How does eventual consistency work here? How does the application know to replay the 950 events that are missing at the end of the day or the x events that were missed because of the downtime ten minutes ago?

I have read questions on here over the last week or so, which talk about messages being replayed from event store e.g. this one: CQRS - Event replay for read side, however none talk about how this is done. Do I need to setup a scheduled task that runs once per day and replays all events that were created since the date the scheduled task last succeeded? Is there a more elegant approach?

4

4 Answers

3
votes

I've used two approaches in my projects, depending on the requirements:

  1. Synchronous, in-process Readmodels. After the events are persisted, in the same request lifetime, in the same process, the Readmodels are fed with those events. In case of a Readmodel's failure (bug or catchable error/exception) the error is logged and that Readmodel is just skipped and the next Readmodel is fed with the events and so on. Then follow the Sagas, that may generate commands that generate more events and the cycle is repeated.

I use this approach when the impact of a Readmodel's failure is acceptable by the business, when the readiness of a Readmodel's data is more important than the risk of failure. For example, they wanted the data immediately available in the UI.

The error log should be easily accessible on some admin panel so someone would look at it in case a client reports inconsistency between write/commands and read/query.

This also works if you have your Readmodels coupled to each other, i.e. one Readmodel needs data from another canonical Readmodel. Although this seems bad, it's not, it always depends. There are cases when you trade updater code/logic duplication with resilience.

  1. Asynchronous, in-another-process readmodel updater. This is used when I use total separation of the Readmodel from the other Readmodels, when a Readmodel's failure would not bring the whole read-side down; or when a Readmodel needs another language, different from the monolith. Basically this is a microservice. When something bad happens inside a Readmodel it necessary that some authoritative higher level component is notified, i.e. an Admin is notified by email or SMS or whatever.

The Readmodel should also have a status panel, with all kinds of metrics about the events that it has processed, if there are gaps, if there are errors or warnings; it also should have a command panel where an Admin could rebuild it at any time, preferable without a system downtime.

In any approach, the Readmodels should be easily rebuildable.

How would you choose between a pull approach and a push approach? Would you use a message queue with a push (events)

I prefer the pull based approach because:

  • it does not use another stateful component like a message queue, another thing that must be managed, that consume resources and that can (so it will) fail
  • every Readmodel consumes the events at the rate it wants
  • every Readmodel can easily change at any moment what event types it consumes
  • every Readmodel can easily at any time be rebuild by requesting all the events from the beginning
  • there order of events is exactly the same as the source of truth because you pull from the source of truth

There are cases when I would choose a message queue:

  • you need the events to be available even if the Event store is not
  • you need competitive/paralel consumers
  • you don't want to track what messages you consume; as they are consumed they are removed automatically from the queue
2
votes

This talk from Greg Young may help.

How does the application know to replay the 950 events that are missing at the end of the day or the x events that were missed because of the downtime ten minutes ago?

So there are two different approaches here.

One is perhaps simpler than you expect - each time you need to rebuild a read model, just start from event 0 in the stream.

Yeah, the scale on that will eventually suck, so you won't want that to be your first strategy. But notice that it does work.

For updates with not-so-embarassing scaling properties, the usual idea is that the read model tracks meta data about stream position used to construct the previous model. Thus, the query from the read model becomes "What has happened since event #1,999,050"?

In the case of event store, the call might look something like

EventStore.ReadStreamEventsForwardAsync(stream, 1999050, 100, false)
1
votes

Application doesn't know it hasn't processed some events due to a bug.

First of all, I don't understand why you assume that the number of events written on the write side must equal number of events processed by read side. Some projections may subscribe to the same event and some events may have no subscriptions on the read side.

In case of a bug in projection / infrastructure that resulted in a certain projection being invalid you might need to rebuild this projection. In most cases this would be a manual intervention that would reset the checkpoint of projection to 0 (begining of time) so the projection will pick up all events from event store from scratch and reprocess all of them again.

1
votes

The event store should have a global sequence number across all events starting, say, at 1.

Each projection has a position tracking where it is along the sequence number. The projections are like logical queues.

You can clear a projection's data and reset the position back to 0 and it should be rebuilt.

In your case the projection fails for some reason, like the server going offline, at position 1,999,050 but when the server starts up again it will continue from this point.