We have a Kafka producer that produces keyed messages in a very high frequency to topics whose retention time = 10 hours. These messages are real-time updates and the used key is the ID of the element whose value has changed. So the topic is acting as a changelog and will have many duplicate keys.
Now, what we're trying to achieve is that when a Kafka consumer launches, regardless of the last known state (new consumer, crashed, restart, etc..), it will somehow construct a table with the latest values of all the keys in a topic, and then keeps listening for new updates as normal, keeping the minimum load on Kafka server and letting the consumer do most of the job. We tried many ways and none of them seems the best.
What we tried:
1 changelog topic + 1 compact topic:
- The producer sends the same message to both topics wrapped in a transaction to assure successful send.
- Consumer launches and requests the latest offset of the changelog topic.
- Consumes the compacted topic from beginning to construct the table.
- Continues consuming the changelog since the requested offset.
Cons:
- Having duplicates in compacted topic is a very high possibility even with setting the log compaction frequency the highest possible.
- x2 number of topics on Kakfa server.
KSQL:
With KSQL we either have to rewrite a KTable as a topic so that consumer can see it (Extra topics), or we will need consumers to execute KSQL SELECT
using to KSQL Rest Server and query the table (Not as fast and performant as Kafka APIs).
Kafka Consumer API:
Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.
Kafka Streams:
By using KTables as following:
KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
Kafka Streams will create 1 topic on Kafka server per KTable (named {consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog
), which will result in a huge number of topics since we a big number of consumers.
From what we have tried, it looks like we need to either increase the server load, or the consumer launch time. Isn't there a "perfect" way to achieve what we're trying to do?
Thanks in advance.