3
votes

My kafka sink connector reads from multiple topics (configured with 10 tasks) and processes upwards of 300 records from all topics. Based on the information held in each record, the connector may perform certain operations.

Here is an example of the key:value pair in a trigger record:

"REPROCESS":"my-topic-1"

Upon reading this record, I would then need to reset the offsets of the topic 'my-topic-1' to 0 in each of its partitions.

I have read in many places that creating a new KafkaConsumer, subscribing to the topic's partitions, then calling the subscribe(...) method is the recommended way. For example,

public class MyTask extends SinkTask {

    @Override
    public void put(Collection<SinkRecord> records) {
        records.forEach(record -> {
        if (record.key().toString().equals("REPROCESS")) {
            reprocessTopicRecords(record);
        } else {
            // do something else
        }
        });
    }
    private void reprocessTopicRecords(SinkRecord record) {
        KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = 
            new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
        reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
            new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
                    // do offset reset here
                }
            }
        );
    }
}

However, the above strategy does not work for my case because: 1. It depends on a group rebalance taking place (does not always happen) 2. 'partitions' passed to the onPartitionsAssigned method are dynamically assigned partitions, meaning these are only a subset to the full set of partitions that will need to have their offset reset. For example, this SinkTask will be assigned only 2 of the 8 partitions that hold the records for 'my-topic-1'.

I've also looked into using assign() but this is not compatible with the distributed consumer model (consumer groups) in the SinkConnector/SinkTask implementation.

I am aware that the kafka command line tool kafka-consumer-groups can do exactly what I want (I think): https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

To summarize, I want to reset the offsets of all partitions for a given topic using Java APIs and let the Sink Connector pick up the offset changes and continue to do what it has been doing (processing records).

Thanks in advance.

3
2 questions : 1) why do you want to reprocess a topic ? 2) why don't you just take a freshNewGroupId ?Gremi64
1) Take for example a sink connector that consumes data, transforms it, and POST/PUTs to a DB. If this logic modified based on consumer application needs, it would be ideal if that the same data set can be reprocessed and fed to it so that those changes can take effect 2) Our application relies on the kafka-connector to continue doing its work when it is resumed. Out of the 20+ topics the connector consumes, only 2 or 3 topics may be reprocessed at a time - creating a new groupId for just these reprocessed topics is a bad idea for us in terms of offset/consumer managementJason Choi

3 Answers

3
votes

I was able to achieve resetting offsets for a kafka connect consumer group by using a series of Confluent's kafka-rest-proxy APIs: https://docs.confluent.io/current/kafka-rest/api.html

This implementation no longer requires the 'trigger record' approach firs described in the original post and is purely Rest API based.

  1. Temporarily delete the kafka connector (this deletes the connector's consumers and )

  2. Create a consumer instance for the same consumer group ("connect-")

  3. Have the instance subscribe to the requested topic you want to reset

  4. Do a dummy poll ('subscribe' is evaluated lazily')

  5. Reset consumer group topic offsets for specified topic

  6. Do a dummy poll ('seek' is evaluated lazily') Commit the current offset state (in the proxy) for the consumer

  7. Re-create kafka connector (with same connector name) - after re-balancing, consumers will join the group and read the last committed offset (starting from 0)

  8. Delete the temporary consumer instance

If you are able to use the CLI, Steps 2-6 can be replaced with:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

As for those of you trying to do this in the kafka connector code through native Java APIs, you're out of luck :-(

1
votes

You're looking for the seek method. Either to an offset

consumer.seek(new TopicPartition("topic-name", partition), offset);

Or seekToBeginning

However, I feel like you'd be competing with the Connect Sink API's consumer group. In other words, assuming you setup the consumer with a separate group id, then you're essentially consuming records twice here from the source topic, once by Connect, and then your own consumer instance.

Unless you explicitly seek Connect's own consumer instance as well (which is not exposed), you'd be getting into a weird state. For example, your task only executes on new records to the topic, despite the fact your own consumer would be looking at an old offset, or you'd still be getting even newer events while still processing old ones

Also, eventually you might get a reprocess event at the very beginning of the topic due to retention policies, expiring old records, for example, causing your consumer to not progress at all and constantly rebalancing its group by seeking to the beginning

0
votes

We had to do a very similar offset resetting exercise.

KafkaConsumer.seek() combined with KafkaConsumer.commitSync() worked well.

There is another option that is worth mentioning, if you are dealing with lots of topics and partitions (javadoc):

AdminClient.alterConsumerGroupOffsets(
  String groupId, 
  Map<TopicPartition,OffsetAndMetadata> offsets
)

We were lucky because we had the luxury to stop the Kafka Connect instance for a while, so there's no consumer group competing.