1
votes

Assume I have a topic(test) in a 2 Kafka partition, and a 2 consumer-group(X, Y) with the single consumer in each consumer-group is consuming topic.

Now I want to know the offset of the other consumer-group in the same partition. The below pseudo code will explain the need

*** Let's assume this is running in the context of consumer group X

TOPIC = “test”
// consumer for group x
Consumer<K, V> consumerX = new KafkaConsumer<>(consumerProperties);
consumerX.subscribe(TOPIC, new ReportOnRebalance(……..));

// Get the current assigned partition, could be null but keep searching 
// until partition got assigned to the consumerX
Set<TopicPartition> topicPartition = consumerX.assignment();

// Get the last committed offset
offsetAndMetadataX = consumerX.committed(topicPartition)

// consumer for group y
Consumer<K, V> consumerY = new KafkaConsumer<>(consumerProperties);

// manually assign because I am interested in the offset for the 
// partition consumerX is going to serve for
consumerY.assign(topicPartition)

// Get the last committed offset
offsetAndMetadataY = consumerY.committed(topicPartition)

// Do require logic with offsetAndMetadataC and offsetAndMetadataY
newOffset = foo(offsetAndMetadataX, offsetAndMetadataY)

// want to reset the offset for this consumerY and in this 
// partition
consumerY.seek(topicPartition, bar(newOffset))

// Change offset for consumerX and starting polling for messages
consumerX.seek(topicPartition, newOffset)
while(...) {
    consumerX.poll(..)
    ....
}


*** Now the same code will run in the context of consumer group Y, but the role will be reversed

consumerY.subscribe()
consumerX.assign()
...
consumerY.seek(topicPartition, bar(newOffset))
...
// Change offset for consumerY and starting polling for messages
consumerY.seek(topicPartition, newOffset)
while(...) {
    consumerY.poll(..)
    ....
}

I am not sure whether the above logic will work or not. The part I am not sure about is when one consumer group(X) do subscribe in one machine and let's say partition(1) is assigned and same consumer group(X) do assign but in another machine and also do seek as part of assign to some offset. I don't know whether this will work or not?

Why I want to do this, want to understand the assign and subscribe usage also we have a need to manually skip processing few offsets which another consumer group already processed or reprocess the old offsets which another consumer already processed

1

1 Answers

2
votes

I haven't attempted what you're describing here, but from the official documentation, it seems this should work like you want it to:

https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment

Key section highlighted here:

Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

It seems basically, all the dynamic rebalancing is automatically turned off if you start manually assigning partitions to a consumer. So, you should be careful, but it seems Kafka does allow for the scenario you describe.