0
votes

I am implementing a custom consumers to topics/partitions assignor in Kafka. To this end, I am overriding the AbstractPartitionAssignor abstract class which in turn implements the ConsumerPartitionAssignor interface.

As part of the custom assignor, I want to send a single (float) information about each partition of each topic that the consumers subscribe to.

I am aware that I can send custom data to the assignor by overriding the default method ByteBuffer subscriptionUserData(Set<String> topics) of the ConsumerPartitionAssignor interface.

However, the issue is that from the method signature above I can not get the list of partitions assigned to the underlined consumer for each of the topics that the consumer registers for.

On the other hand, I can see that the subscription class sent by each of the consumers to the group coordinator has a list of the owned partitions per consumer.

public static final class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData;
    private final List<TopicPartition> ownedPartitions;
    private Optional<String> groupInstanceId;
    .....

Any hint on how I can send custom per partition data to the group coordinator through the method ByteBuffer subscriptionUserData(Set<String> topics), or using any other way that relies only on kafka public APIs.

Thank you.

1

1 Answers

0
votes

To solve the above issue, just use the Assignor callback function onAssignment as shown below

...
private List<TopicPartition> memberAssignment = null;
     @Override
        public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
            memberAssignment = assignment.partitions();
            this.generation = metadata.generationId();
        }

and use memberAssignment inside the function subscriptionUserData (or any where in the Assignor class) to get the list of partitions currently assigned to each consumer.

public ByteBuffer subscriptionUserData(Set<String> topics)