1
votes

I am trying to implement event sourcing using kafka.

My vision for the stream processor application is a typical 3-layer Spring application in which:

  • The "presentation" layer is replaced by (implemented by?) Kafka streams API.
  • The business logic layer is utilized by the processor API in the topology.
  • Also, the DB is a relational H2, In-memory database which is accessed via Spring Data JPA Repositories. The repositories also implements necessary interfaces for them to be registered as Kafka state stores to use the benefits (restoration & fault tolerance)

But I'm wondering how should I implement the custom state store part?

I have been searching And:

  • There are some interfaces such as StateStore & StoreBuilder. StoreBuilder has a withLoggingEnabled() method; But if I enable it, when does the actual update & change logging happen? usually the examples are all key value stores even for the custom ones. What if I don't want key value? The example in interactive queries section in kafka documentation just doesn't cut it.

  • I am aware of interactive queries. But they seem to be good for queries & not updates; as the name suggests.

In a key value store the records that are sent to change log are straightforward. But if I don't use key value; when & how do I inform kafka that my state has changed?

1

1 Answers

5
votes

You will need to implement StateStore for the actually store engine you want to use. This interface does not dictate anything about the store, and you can do whatever you want.

You also need to implement a StoreBuilder that act as a factory to create instances of your custom store.

MyCustomStore implements StateStore {
    // define any interface you want to present to the user of the store
}

MyCustomStoreBuilder implements StoreBuilder<MyCustomStore> {
    MyCustomStore builder() {
        // create new instance of MyCustomStore and return it
    }

    // all other methods (except `name()`) are optional
    // eg, you can do a dummy implementation that only returns `this`
}

Compare: https://docs.confluent.io/current/streams/developer-guide/processor-api.html#implementing-custom-state-stores

But if I don't use key value; when & how do I inform kafka that my state has changed?

If you want to implement withLoggingEnabled() (similar for caching), you will need to implement this logging (or caching) as part of your store. Because, Kafka Streams does not know how your store works, it cannot provide an implementation for this. Thus, it's your design decision, if your store supports logging into a changelog topic or not. And if you want to support logging, you need to come up with a design that maps store updates to key-value pairs (you can also write multiple per update) that you can write into a changelog topic and that allows you to recreate the state when reading those records fro the changelog topic.

Getting a fault-tolerant store is not only possible via change logging. For example, you could also plugin a remote store, that does replication etc internally and thus rely on the store's fault-tolerance capabilities instead of using change logging. Of course, using a remote store implies other challenges compare to using a local store.

For the Kafka Streams default stores, logging and caching is implemented as wrappers for the actual store, making it easily plugable. But you can implement this in any way that fits your store best. You might want to check out the following classes for the key-value-store as comparison:

For interactive queries, you implement a corresponding QueryableStoreType to integrate your custom store. Cf. https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores You are right, that Interactive Queries is a read-only interface for the existing stores, because the Processors should be responsible for maintaining the stores. However, nothing prevents you to open up your custom store for writes, too. However, this will make your application inherently non-deterministic, because if you rewind an input topic and reprocess it, it might compute a different result, depending what "external store writes" are performed. You should consider doing any write to the store via the input topics. But it's your decision. If you allow "external writes" you will need to make sure that they get logged, too, in case you want to implement logging.