Assume I have a topic(test) in a 2 Kafka partition, and a 2 consumer-group(X, Y) with the single consumer in each consumer-group is consuming topic.
Now I want to know the offset of the other consumer-group in the same partition. The below pseudo code will explain the need
*** Let's assume this is running in the context of consumer group X
TOPIC = “test”
// consumer for group x
Consumer<K, V> consumerX = new KafkaConsumer<>(consumerProperties);
consumerX.subscribe(TOPIC, new ReportOnRebalance(……..));
// Get the current assigned partition, could be null but keep searching
// until partition got assigned to the consumerX
Set<TopicPartition> topicPartition = consumerX.assignment();
// Get the last committed offset
offsetAndMetadataX = consumerX.committed(topicPartition)
// consumer for group y
Consumer<K, V> consumerY = new KafkaConsumer<>(consumerProperties);
// manually assign because I am interested in the offset for the
// partition consumerX is going to serve for
consumerY.assign(topicPartition)
// Get the last committed offset
offsetAndMetadataY = consumerY.committed(topicPartition)
// Do require logic with offsetAndMetadataC and offsetAndMetadataY
newOffset = foo(offsetAndMetadataX, offsetAndMetadataY)
// want to reset the offset for this consumerY and in this
// partition
consumerY.seek(topicPartition, bar(newOffset))
// Change offset for consumerX and starting polling for messages
consumerX.seek(topicPartition, newOffset)
while(...) {
consumerX.poll(..)
....
}
*** Now the same code will run in the context of consumer group Y, but the role will be reversed
consumerY.subscribe()
consumerX.assign()
...
consumerY.seek(topicPartition, bar(newOffset))
...
// Change offset for consumerY and starting polling for messages
consumerY.seek(topicPartition, newOffset)
while(...) {
consumerY.poll(..)
....
}
I am not sure whether the above logic will work or not. The part I am not sure about is when one consumer group(X) do subscribe in one machine and let's say partition(1) is assigned and same consumer group(X) do assign but in another machine and also do seek as part of assign to some offset. I don't know whether this will work or not?
Why I want to do this, want to understand the assign and subscribe usage also we have a need to manually skip processing few offsets which another consumer group already processed or reprocess the old offsets which another consumer already processed