1
votes

There is a useful metric for monitoring Kafka Consumer lag in spring-kafka called kafka_consumer_records_lag_max_records. But this metric is not working for transactional consumers. Is there specific configuration to enable lag metric for transactional consumers?

I have configured my consumer group to work with isolation level read_committed and the metric contains kafka_consumer_records_lag_max_records{client_id="listener-1",} -Inf

1
I added the apache-kafka tag because all Spring and Micrometer are doing is providing access to the kafka-clients' MBeans, where the -Infinity value is coming from.Gary Russell

1 Answers

1
votes

What do you mean by "doesn't work"? I just tested it and it works fine...

@SpringBootApplication
public class So56540759Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext context = SpringApplication.run(So56540759Application.class, args);
        System.in.read();
        context.close();
    }

    private MetricName lagNow;

    private MetricName lagMax;

    @Autowired
    private MeterRegistry meters;

    @KafkaListener(id = "so56540759", topics = "so56540759", clientIdPrefix = "so56540759",
            properties = "max.poll.records=1")
    public void listen(String in, Consumer<?, ?> consumer) {
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        Metric currentLag = metrics.get(this.lagNow);
        Metric maxLag = metrics.get(this.lagMax);
        System.out.println(in
                + " lag " + currentLag.metricName().name() + ":" + currentLag.metricValue()
                + " max " + maxLag.metricName().name() + ":" + maxLag.metricValue());
        Gauge gauge = meters.get("kafka.consumer.records.lag.max").gauge();
        System.out.println("lag-max in Micrometer: " + gauge.value());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56540759", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        Set<String> tags = new HashSet<>();
        FetcherMetricsRegistry registry = new FetcherMetricsRegistry(tags, "consumer");
        MetricNameTemplate temp = registry.recordsLagMax;
        this.lagMax = new MetricName(temp.name(), temp.group(), temp.description(),
                Collections.singletonMap("client-id", "so56540759-0"));
        temp = registry.partitionRecordsLag;
        Map<String, String> tagsMap = new LinkedHashMap<>();
        tagsMap.put("client-id", "so56540759-0");
        tagsMap.put("topic", "so56540759");
        tagsMap.put("partition", "0");
        this.lagNow = new MetricName(temp.name(), temp.group(), temp.description(), tagsMap);

        return args -> IntStream.range(0, 10).forEach(i -> template.send("so56540759", "foo" + i));
    }

}
2019-06-11 12:13:45.803  INFO 32187 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = so56540759-0
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = so56540759
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_committed
    ...
    transaction.timeout.ms = 60000
    ...

2019-06-11 12:13:45.840  INFO 32187 --- [o56540759-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so56540759-0]
foo0 lag records-lag:9.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo1 lag records-lag:8.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo2 lag records-lag:7.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo3 lag records-lag:6.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo4 lag records-lag:5.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo5 lag records-lag:4.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo6 lag records-lag:3.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo7 lag records-lag:2.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo8 lag records-lag:1.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo9 lag records-lag:0.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0

EDIT2

I do see it going to -Infinity in the MBean if a transaction times out - i.e. if the listener doesn't exit within 60 seconds in my test.

enter image description here