0
votes

I just started with a simple example to read the jmx metrics and used the simple values of HeapMemoryUsage, CPUTime etc to get the feel for it. I need to try and access the kafka server / consumer metrics specifically lag which I can see is visible as a key in the jconsole app under FetcherLagMetrics-ConsumerLag. But programmaticaly I get the following error:

javax.management.InstanceNotFoundException: kafka.consumer:type=consumer- 
  fetch-manager-metrics

This tells me that the consumer-fetch-manager metrics is the issue as it is not present even in the jconsole. I changed it to the following & still the same issue:

consumerBean = jmxCon.getMBeanServerConnection().getAttribute(new 
   ObjectName("kafka.server:type=FetcherLagMetrics"),"ConsumerLag");
            cd = (CompositeData) consumerBean;

The code trying to access these values is as follows:

        jmxCon.getMBeanServerConnection().invoke(new 
         ObjectName("java.lang:type=Memory"), "gc", null, null);

        for (int i = 0; i < 100; i++) {

            //get an instance of the HeapMemoryUsage Mbean
            memoryMbean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage");
            cd = (CompositeData) memoryMbean;

            //get an instance of the OperatingSystem Mbean
            osMbean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("java.lang:type=OperatingSystem"),"ProcessCpuTime");

            //get an instance of the kafka metrics Mbean
            consumerBean = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics"),"MaxLag");
            cd = (CompositeData) consumerBean;

            consumerBeanII = jmxCon.getMBeanServerConnection().getAttribute(new ObjectName("kafka.server:type=FetcherLagMetrics,name=ConsumerLag"),"Lag");


            System.out.println("Used memory: " + " " + cd.get("MaxLag") + " Used cpu: " + consumerBean); //print memory usage

            tempMemory = tempMemory + Long.parseLong(cd.get("used").toString());
            Thread.sleep(1000); //delay for one second

        }

It fails at line consumerBean = ......Could someone please explain or provide the correct way to access the kafka metrics using jmx /JMI implementation?

1
Did you try to provide a Client id? kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)Giorgos Myrianthous
I don't understand the hierarchy in MBeans so not sure which one is the client id as there are multiple subfolders - kafka.server ---> FetcherLagMetrics --> ConsumerLag ---> ReplicaFetcherThread-0-2 ---> data.in ---> 5 ---> Attributes ---> ValuevbNewbie

1 Answers

1
votes

I'm not sure your version of Kafka, but when I look at mine (v1.1.0) using JConsole there is no such bean for consumer lag that you describe. Therefore I think it is expected that your JMX query will fail.

Instead, I believe this kind of information was moved to the Kafka Admin API interface. So you must use this now to get lag settings, if you don't want to use kafka-consumer-groups.sh command-line utility.

For us we only need these stats for monitoring, so we are using https://github.com/danielqsj/kafka_exporter to obtain the info for Prometheus.