3
votes

We have a Kafka producer that produces keyed messages in a very high frequency to topics whose retention time = 10 hours. These messages are real-time updates and the used key is the ID of the element whose value has changed. So the topic is acting as a changelog and will have many duplicate keys.

Now, what we're trying to achieve is that when a Kafka consumer launches, regardless of the last known state (new consumer, crashed, restart, etc..), it will somehow construct a table with the latest values of all the keys in a topic, and then keeps listening for new updates as normal, keeping the minimum load on Kafka server and letting the consumer do most of the job. We tried many ways and none of them seems the best.

What we tried:

1 changelog topic + 1 compact topic:

  1. The producer sends the same message to both topics wrapped in a transaction to assure successful send.
  2. Consumer launches and requests the latest offset of the changelog topic.
  3. Consumes the compacted topic from beginning to construct the table.
  4. Continues consuming the changelog since the requested offset.

Cons:

  • Having duplicates in compacted topic is a very high possibility even with setting the log compaction frequency the highest possible.
  • x2 number of topics on Kakfa server.

KSQL:

With KSQL we either have to rewrite a KTable as a topic so that consumer can see it (Extra topics), or we will need consumers to execute KSQL SELECT using to KSQL Rest Server and query the table (Not as fast and performant as Kafka APIs).

Kafka Consumer API:

Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.

Kafka Streams:

By using KTables as following:

KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));

Kafka Streams will create 1 topic on Kafka server per KTable (named {consumer_app_id}-{topic_name}-STATE-STORE-0000000000-changelog), which will result in a huge number of topics since we a big number of consumers.

From what we have tried, it looks like we need to either increase the server load, or the consumer launch time. Isn't there a "perfect" way to achieve what we're trying to do?

Thanks in advance.

2

2 Answers

3
votes

By using KTables, Kafka Streams will create 1 topic on Kafka server per KTable, which will result in a huge number of topics since we a big number of consumers.

If you are just reading an existing topic into a KTable (via StreamsBuilder#table()), then no extra topics are being created by Kafka Streams. Same for KSQL.

It would help if you could clarify what exactly you want to do with the KTable(s). Apparently you are doing something that does result in additional topics being created?

1 changelog topic + 1 compact topic:

Why were you thinking about having two separate topics? Normally, changelog topics should always be compacted. And given your use case description, I don't see a reason why it should not be:

Now, what we're trying to achieve is that when a Kafka consumer launches, regardless of the last known state (new consumer, crashed, restart, etc..), it will somehow construct a table with the latest values of all the keys in a topic, and then keeps listening for new updates as normal [...]

Hence compaction would be very useful for your use case. It would also prevent this problem you described:

Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.

Note that, to reconstruct the latest table values, all three of Kafka Streams, KSQL, and the Kafka Consumer must read the table's underlying topic completely (from beginning to end). If that topic is NOT compacted, this might indeed take a long time depending on the data volume, topic retention settings, etc.

From what we have tried, it looks like we need to either increase the server load, or the consumer launch time. Isn't there a "perfect" way to achieve what we're trying to do?

Without knowing more about your use case, particularly what you want to do with the KTable(s) once they are populated, my answer would be:

  • Make sure the "changelog topic" is also compacted.
  • Try KSQL first. If this doesn't satisfy your needs, try Kafka Streams. If this doesn't satisfy your needs, try the Kafka Consumer.

For example, I wouldn't use the Kafka Consumer if it is supposed to do any stateful processing with the "table" data, because the Kafka Consumer lacks built-in functionality for fault-tolerant stateful processing.

0
votes

Consumer starts and consumes the topic from beginning. This worked perfectly, but the consumer has to consume the 10 hours change log to construct the last values table.

During the first time your application starts up, what you said is correct.

To avoid this during every restart, store the key-value data in a file.

For example, you might want to use a persistent map (like MapDB).

Since you give the consumer group.id and you commit the offset either periodically or after each record is stored in the map, the next time your application restarts it will read it from the last comitted offset for that group.id.

So the problem of taking a lot of time occurs only initially (during first time). So long as you have the file, you don't need to consume from beginning.

In case, if the file is not there or is deleted, just seekToBeginning in the KafkaConsumer and build it again.

Somewhere, you need to store this key-values for retrieval and why cannot it be a persistent store?


In case if you want to use Kafka streams for whatever reason, then an alternative (not as simple as the above) is to use a persistent backed store.

For example, a persistent global store.

streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(topic), keySerde, valueSerde), topic, Consumed.with(keySerde, valueSerde), this::updateValue);

P.S: There will be a file called .checkpoint in the directory which stores the offsets. In case if the topic is deleted in the middle you get OffsetOutOfRangeException. You may want to avoid this, perhaps by using UncaughtExceptionHandler

Refer to https://stackoverflow.com/a/57301986/2534090 for more.

Finally,

It is better to use Consumer with persistent file rather than Streams for this, because of simplicity it offers.