0
votes

I'm using Cassandra and Kafka for event-sourcing, and it works quite well. But I've just recently discovered a potentially major flaw in the design/set-up. A brief intro to how it is done:

  1. The aggregate command handler is basically a kafka consumer, which consumes messages of interest on a topic:

    1.1 When it receives a command, it loads all events for the aggregate, and replays the aggregate event handler for each event to get the aggregate up to current state.

    1.2 Based on the command and businiss logic it then applies one or more events to the event store. This involves inserting the new event(s) to the event store table in cassandra. The events are stamped with a version number for the aggregate - starting at version 0 for a new aggregate, making projections possible. In addition it sends the event to another topic (for projection purposes).

    1.3 A kafka consumer will listen on the topic upon these events are published. This consumer will act as a projector. When it receives an event of interest, it loads the current read model for the aggregate. It checks that the version of the event it has received is the expected version, and then updates the read model.

This seems to work very well. The problem is when I want to have what EventStore calls category projections. Let's take Order aggregate as an example. I can easily project one or more read models pr Order. But if I want to for example have a projection which contains a customers 30 last orders, then I would need a category projection.

I'm just scratching my head how to accomplish this. I'm curious to know if any other are using Cassandra and Kafka for event sourcing. I've read a couple of places that some people discourage it. Maybe this is the reason.

I know EventStore has support for this built in. Maybe using Kafka as event store would be a better solution.

1
What granularity do your event topics have? Is there one topic per aggregate type or one topic per aggregate instance? Given that Kafka doesn't scale to millions of topics, the former is the normal approach, and means you already have your category ready to go.TomW
One topic pr aggregate type. But 3 partitions. And 2 instances of the application (meaning two consumers in same consumergroup). But now I've been thinking of a solution to make a "global" event version pr aggregate type. If I send the aggregate events to a topic (topic pr aggregate) with only one partition, then I can consume this and stamp the events with a global version and then output the versioned event to another topic. I was then thinking of having one consumergroup pr projection for this topic and store the position of the projection in a database. But this will fail for 3 partitionsuser2814881
The only way I can see right now is to have only one partition on the topic which the projection consumers listens to. Not sure if this is best practice thoughuser2814881

1 Answers

1
votes

With this kind of architecture, you have to choose between:

  • Global event stream per type - simple
  • Partitioned event stream per type - scalable

Unless your system is fairly high throughput (say at least 10s or 100s of events per second for sustained periods to the stream type in question), the global stream is the simpler approach. Some systems (such as Event Store) give you the best of both worlds, by having very fine-grained streams (such as per aggregate instance) but with the ability to combine them into larger streams (per stream type/category/partition, per multiple stream types, etc.) in a performant and predictable way out of the box, while still being simple by only requiring you to keep track of a single global event position.

If you go partitioned with Kafka:

  • Your projection code will need to handle concurrent consumer groups accessing the same read models when processing events for different partitions that need to go into the same models. Depending on your target store for the projection, there are lots of ways to handle this (transactions, optimistic concurrency, atomic operations, etc.) but it would be a problem for some target stores
  • Your projection code will need to keep track of the stream position of each partition, not just a single position. If your projection reads from multiple streams, it has to keep track of lots of positions.

Using a global stream removes both of those concerns - performance is usually likely to be good enough.

In either case, you'll likely also want to get the stream position into the long term event storage (i.e. Cassandra) - you could do this by having a dedicated process reading from the event stream (partitioned or global) and just updating the events in Cassandra with the global or partition position of each event. (I have a similar thing with MongoDB - I have a process reading the 'oplog' and copying oplog timestamps into events, since oplog timestamps are totally ordered).

Another option is to drop Cassandra from the initial command processing and use Kafka Streams instead:

  • Partitioned command stream is processed by joining with a partitioned KTable of aggregates
  • Command result and events are computed
  • Atomically, KTable is updated with changed aggregate, events are written to event stream and command response is written to command response stream.

You would then have a downstream event processor that copies the events into Cassandra for easier querying etc. (and which can add the Kafka stream position to each event as it does it to give the category ordering). This can help with catch up subscriptions, etc. if you don't want to use Kafka for long term event storage. (To catch up, you'd just read as far as you can from Cassandra and then switch to streaming from Kafka from the position of the last Cassandra event). On the other hand, Kafka itself can store events for ever, so this isn't always necessary.

I hope this helps a bit with understanding the tradeoffs and problems you might encounter.