I am trying to expose a Prometheus Gauge in a Flink app:
@transient def metricGroup: MetricGroup = getRuntimeContext.getMetricGroup
.addGroup("site", site)
.addGroup("sink", counterBaseName)
@transient var failedCounter: Counter = _
def expose(metricName: String, gaugeValue: Int, context: SinkFunction.Context[_]): Unit = {
try {
metricGroup.addGroup("hostname", metricName).gauge[Int, ScalaGauge[Int]]("test", ScalaGauge[Int](() => gaugeValue))
}
} catch {
case _: Throwable => failedCounter.inc()
}
}
The app runs locally just fine and expose the metrics without any problem.
While trying to move to production I encounter the following exception in Flink task manager:
WARN org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while registering metric. java.lang.NullPointerException
Not sure, what am I missing here.
Why does the local app expose metrics while on the cluster it fails to register gauge?
I use Prometheus in order to expose other metrics from Flink, for example, failedCounter (in the code) which is a counter.
This is the first time I exposed gauge in Flink so I bet something in my implementation is broken.
Please help.
Name collision: Group already contains a Metric with the name 'rejected_by_server'. Metric will not be reported.[10.199.193.172, taskmanager, 0536b010099d06a2693dcad8833ab060, rtbRejectedCounter, Sink: PrometheusSink, 0, site, null, sink, PrometheusSink, hostname, was-prd-web60]
How can I run get or create gauge and get or create metric? The interesting thing is why locals worked with no errors. – EyalP