My kafka sink connector reads from multiple topics (configured with 10 tasks) and processes upwards of 300 records from all topics. Based on the information held in each record, the connector may perform certain operations.
Here is an example of the key:value pair in a trigger record:
"REPROCESS":"my-topic-1"
Upon reading this record, I would then need to reset the offsets of the topic 'my-topic-1' to 0 in each of its partitions.
I have read in many places that creating a new KafkaConsumer
, subscribing to the topic's partitions, then calling the subscribe(...)
method is the recommended way. For example,
public class MyTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
);
}
}
However, the above strategy does not work for my case because:
1. It depends on a group rebalance taking place (does not always happen)
2. 'partitions' passed to the onPartitionsAssigned
method are dynamically assigned partitions, meaning these are only a subset to the full set of partitions that will need to have their offset reset. For example, this SinkTask will be assigned only 2 of the 8 partitions that hold the records for 'my-topic-1'.
I've also looked into using assign()
but this is not compatible with the distributed consumer model (consumer groups) in the SinkConnector/SinkTask implementation.
I am aware that the kafka command line tool kafka-consumer-groups
can do exactly what I want (I think):
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
To summarize, I want to reset the offsets of all partitions for a given topic using Java APIs and let the Sink Connector pick up the offset changes and continue to do what it has been doing (processing records).
Thanks in advance.