15
votes

When defining a topology in kafka streams, a global state store can be added. It will need a source topic as well as a ProcessorSupplier. The processor receive records and could theorically transform them before adding them to the store. But in case of restoration, the records are directly inserted from the source topic (changelog) into the global state store, skipping eventual transformation done in the processor.

   +-------------+             +-------------+              +---------------+
   |             |             |             |              |    global     |
   |source topic  ------------->  processor  +-------------->    state      |
   |(changelog)  |             |             |              |    store      |
   +-------------+             +-------------+              +---------------+
          |                                                         ^
          |                                                         |
          +---------------------------------------------------------+
              record directly inserted during restoration

StreamsBuilder#addGlobalStore(StoreBuilder storeBuilder, String topic, Consumed consumed, ProcessorSupplier stateUpdateSupplier) Adds a global StateStore to the topology.

As per documentation

NOTE: you should not use the Processor to insert transformed records into the global state store. This store uses the source topic as changelog and during restore will insert records directly from the source. This ProcessorNode should be used to keep the StateStore up-to-date.

In parallel as major bug is currently open on the kafka bug tracker : KAFKA-7663 Custom Processor supplied on addGlobalStore is not used when restoring state from topic which explains exactly what is stated in the documentation, but seems to be an accepted bug.

I am wondering if KAFKA-7663 is indeed a bug or not. According to the documentation, it seems to have been designed like this, in which case I struggle to understand the use case.
Can someone explain the major use cases of this low level API? Only thing I can think of is to process side effects, like for example, doing some log operations in the processor.

Bonus question: If source topic acts as the changelog of the global store, when a record is deleted from the topic because the retention has expired, will it be removed from the global state store? Or does the removal will only take place in the store after a full store restoration from changelog.

2
Note that older documentation was not pointing out the issue, and we just updated the doc as "intermediate fix".Matthias J. Sax

2 Answers

13
votes

Yeah, this is quite a weird little catch-22, but the documentation is correct. The Processor for a global state store must not do anything to the records but persist them into the store.

AFAIK, this isn't a philosophical issue, just a practical one. The reason is simply the behavior you observe... Streams treats the input topic as a changelog topic for the store and therefore bypasses the processor (as well as deserialization) during restoration.

The reason that state restoration bypasses any processing is that usually the data in a changelog is identical to the data in the store, so it would actually be wrong to do anything new to it. Plus, it's more efficient just to take the bytes off the wire and bulk-write them into the state stores. I say "usually" because in this case, the input topic isn't exactly like a normal changelog topic, in that it doesn't receive its writes during store puts.

For what it's worth, I also struggle to understand the use case. Seemingly, we should either:

  1. Get rid of that processor entirely, and always just dump the binary data off the wire into the stores, just like restoration does.
  2. Re-design global stores to allow arbitrary transformations before the global store. We could either:
    • continue to use the input topic and deserialize and invoke the processors during restoration as well, OR
    • add a real changelog for global stores, such that we'd poll the input topic, apply some transformations, then write to the global store and the global-store-changelog. Then, we can use the changelog (not the input) for restoration and replication.

By the way, if you want the latter behavior, you can approximate it right now by applying your transformations and then using to(my-global-changelog) to manufacture a "changelog" topic. Then, you'd create the global store to read from your my-global-changelog instead of the input.

So, to give you a direct answer, KAFKA-7663 is not a bug. I'll comment on the ticket proposing to turn it into a feature request.

Bonus answer: Topics that act as changelogs for state stores must not be configured with retention. Practically speaking, this means you should prevent infinite growth by enabling compaction, and disable log retention.

In practice, old data falling out of retention and getting dropped is not an "event", and consumers have no way of knowing if/when it happens. Therefore, it's not possible to remove data from the state stores in response to this non-event. It would happen as you describe... the records would just sit there in the global store indefinitely. If/when an instance is replaced, the new one would restore from the input and (obviously) only receive records that exist in the topic at that time. Thus, the Streams cluster as a whole would wind up with an inconsistent view of the global state. That's why you should disable retention.

The right way to "drop" old data from the store would be to just write a tombstone for the desired key into the input topic. This would then be correctly propagated to all members of the cluster, applied correctly during restoration, AND correctly compacted by the brokers.

I hope this all helps. Definitely, please chime in on the ticket and help us shape the API to be more intuitive!

0
votes

At present there doesn't seem to be a way to listen to changes on a KGlobalTable.

You can achieve a similar result with a global store and custom processor.

I stumbled across this here How to be notified about updates to state store of GlobalKTable?

I'm not arguing that this is a good use case, but as a workaround it can be helpful.