What do you mean by "doesn't work"? I just tested it and it works fine...
@SpringBootApplication
public class So56540759Application {
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext context = SpringApplication.run(So56540759Application.class, args);
System.in.read();
context.close();
}
private MetricName lagNow;
private MetricName lagMax;
@Autowired
private MeterRegistry meters;
@KafkaListener(id = "so56540759", topics = "so56540759", clientIdPrefix = "so56540759",
properties = "max.poll.records=1")
public void listen(String in, Consumer<?, ?> consumer) {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
Metric currentLag = metrics.get(this.lagNow);
Metric maxLag = metrics.get(this.lagMax);
System.out.println(in
+ " lag " + currentLag.metricName().name() + ":" + currentLag.metricValue()
+ " max " + maxLag.metricName().name() + ":" + maxLag.metricValue());
Gauge gauge = meters.get("kafka.consumer.records.lag.max").gauge();
System.out.println("lag-max in Micrometer: " + gauge.value());
}
@Bean
public NewTopic topic() {
return new NewTopic("so56540759", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
Set<String> tags = new HashSet<>();
FetcherMetricsRegistry registry = new FetcherMetricsRegistry(tags, "consumer");
MetricNameTemplate temp = registry.recordsLagMax;
this.lagMax = new MetricName(temp.name(), temp.group(), temp.description(),
Collections.singletonMap("client-id", "so56540759-0"));
temp = registry.partitionRecordsLag;
Map<String, String> tagsMap = new LinkedHashMap<>();
tagsMap.put("client-id", "so56540759-0");
tagsMap.put("topic", "so56540759");
tagsMap.put("partition", "0");
this.lagNow = new MetricName(temp.name(), temp.group(), temp.description(), tagsMap);
return args -> IntStream.range(0, 10).forEach(i -> template.send("so56540759", "foo" + i));
}
}
2019-06-11 12:13:45.803 INFO 32187 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = so56540759-0
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = so56540759
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_committed
...
transaction.timeout.ms = 60000
...
2019-06-11 12:13:45.840 INFO 32187 --- [o56540759-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so56540759-0]
foo0 lag records-lag:9.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo1 lag records-lag:8.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo2 lag records-lag:7.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo3 lag records-lag:6.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo4 lag records-lag:5.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo5 lag records-lag:4.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo6 lag records-lag:3.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo7 lag records-lag:2.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo8 lag records-lag:1.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo9 lag records-lag:0.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
EDIT2
I do see it going to -Infinity
in the MBean if a transaction times out - i.e. if the listener doesn't exit within 60 seconds in my test.
-Infinity
value is coming from. – Gary Russell