2
votes

I am trying to expose a Prometheus Gauge in a Flink app:

@transient def metricGroup: MetricGroup = getRuntimeContext.getMetricGroup
    .addGroup("site", site)
    .addGroup("sink", counterBaseName)
@transient var failedCounter: Counter = _


def expose(metricName: String, gaugeValue: Int, context: SinkFunction.Context[_]): Unit = {

    try {
         metricGroup.addGroup("hostname", metricName).gauge[Int, ScalaGauge[Int]]("test", ScalaGauge[Int](() => gaugeValue))
       }
    } catch {
      case _: Throwable => failedCounter.inc()
    }
  }

The app runs locally just fine and expose the metrics without any problem.

While trying to move to production I encounter the following exception in Flink task manager:

WARN org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while registering metric. java.lang.NullPointerException

Not sure, what am I missing here.

Why does the local app expose metrics while on the cluster it fails to register gauge?

I use Prometheus in order to expose other metrics from Flink, for example, failedCounter (in the code) which is a counter.

This is the first time I exposed gauge in Flink so I bet something in my implementation is broken.

Please help.

It would be probably easier to track the error if You could paste the whole stack trace instead of just the Exception.Dominik Wosiński
Ok, I think I found the problem, but not sure how to solve it. Name collision: Group already contains a Metric with the name 'rejected_by_server'. Metric will not be reported.[10.199.193.172, taskmanager, 0536b010099d06a2693dcad8833ab060, rtbRejectedCounter, Sink: PrometheusSink, 0, site, null, sink, PrometheusSink, hostname, was-prd-web60] How can I run get or create gauge and get or create metric? The interesting thing is why locals worked with no errors.EyalP
The exception does not match your code; the code has no reference to 'rejected_by_server'. Is 'gaugeValue' guaranteed to be non-null?Chesnay Schepler
Correct, I change the gauge name from rejected_by_server to test, other than that the code is the same. The value cannot bu Null (can be 0). I think I should work with MetricRegistry somehow, but I have not found examples so far.EyalP
How often are you calling expose()? (You are only supposed to register a metric exactly once per subtask.)Chesnay Schepler