I have the names of several compacted Kafka topics (topic1
, topic2
, ..., topicN
) defined in my spring application.yaml file. I want to be able to consume all of the records on each topic partition on startup. The number of partitions on each topic is not known in advance.
The official Spring Kafka 2.6.1 documentation suggests the simplest way to do this is to implement a PartitionFinder and use it in a SpEL expresssion to dynamically look up the number of partitions for a topic, and to then use a *
wildcard in the partitions attribute of a @TopicPartition
annotation (see Explicit Partition Assignment in the @KafkaListener Annotation documentation):
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
Since I have several topics, the resulting code is very verbose:
@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "${topic1}",
partitions = "#{@finder.partitions('${topic1}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
@TopicPartition(
topic = "${topic2}",
partitions = "#{@finder.partitions('${topic2}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
// and many more @TopicPartitions...
@TopicPartition(
topic = "${topicN}",
partitions = "#{@finder.partitions('${topicN}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
)
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}
How can I make this repetitive configuration more concise by configuring the topicPartitions
attribute of the @KafkaListener
annotation with a dynamically generated array of @TopicPartion
s (one for each of my N topics)?