2
votes

I'm looking for a way to do a reprocessing tool in KafkaStreams, that will allow to reprocess data from the beginning in a topic (applying some filters and writing updated versions of those events to the same topic). At the same time there is a long-running application processing data from that topic.

In order to reprocess only up to the point in time at which the application is launched and stops after it, it is needed to know when to stop, which is the latest produced offset at that point. E.g. a map could be constructed prior to start the topology that will have (partition -> offset) to know those limits, so the application will be able to stop when that offset is reached, comparing current partition and offset (via Processor API) with the offset limit on that initial map.

Is it possible/does it make sense to access latest offsets info from within Kafka Streams? Is there another way to work it around? (I guess you can get it creating via regular Kafka consumers, seeking end and getting position, but I'm asking if there is an integrated solution within KafkaStreams).

Also, how to gently stop an application only when all of the partitions have reached its offset, knowing this info is distributed so you'll need to know the state from all instances?

Kafka/KafkaStreams 2.1, Scala 2.12

1

1 Answers

2
votes

Using a consumer to get the end-offset seems reasonable. For stopping the application, you would need to build a manual solution that tracks the progress. For example, using transformValues() you could inspect the topic name, partition, and offset of an input record (using context object provided via init() method). This should allow you to call KafkaStreams#close() when all data is processed.

You might be interested in this KIP (in active atm) that discussed similar ideas: https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams