I am implementing a custom consumers to topics/partitions assignor in Kafka. To this end, I am overriding the AbstractPartitionAssignor
abstract class which in turn implements the ConsumerPartitionAssignor
interface.
As part of the custom assignor, I want to send a single (float) information about each partition of each topic that the consumers subscribe to.
I am aware that I can send custom data to the assignor by overriding the default method
ByteBuffer subscriptionUserData(Set<String> topics)
of the ConsumerPartitionAssignor
interface.
However, the issue is that from the method signature above I can not get the list of partitions assigned to the underlined consumer for each of the topics that the consumer registers for.
On the other hand, I can see that the subscription class sent by each of the consumers to the group coordinator has a list of the owned partitions per consumer.
public static final class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
private final List<TopicPartition> ownedPartitions;
private Optional<String> groupInstanceId;
.....
Any hint on how I can send custom per partition data to the group coordinator through the method ByteBuffer subscriptionUserData(Set<String> topics)
, or using any other way that relies only on kafka public APIs.
Thank you.