2
votes

I'm using Flink 1.9.1 with PrometheusPushGateway to report my metrics. The jobName the metrics are reported with is defined in the flink-conf.yaml file which makes the jobName identical for all jobs who run on the cluster, but I want a different jobName to be reported for every running job. To do so, I tried overriding the config's value inside the job before executing the Stream:

Configuration conf = GlobalConfiguration.loadConfiguration();
    conf.setString(
            "metrics.reporter.promgateway.jobName",
            conf.getString("metrics.reporter.promgateway.jobName", "") + "-" + pipeline
    );
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setGlobalJobParameters(conf);

When pipeline is a String variable.

When running the job locally, it worked. But now I'm running flink in High Availability mode and it doesn't work anymore :( The config I override in the code is ignored and only the value in the cluster's flink-conf.yaml file is used.

So how can I change the jobName per job? And if I can't, is there a way to set additional Labels when reporting the metrics? Because I haven't seen an option for that as well.

Thanks :)

1

1 Answers

0
votes

You can use the following steps to achieve this:

  1. Pass jobName as a command argument like: --jobName MyJobName

  2. Set global parameters:

public static void main(String[] args) throws Exception {
    final ParameterTool command = ParameterTool.fromArgs(args);
    String jobName = command.getRequired("jobName");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Configuration globalConfiguration = new Configuration();
    globalConfiguration.setString("jobName", jobName);
    env.getConfig().setGlobalJobParameters(globalConfiguration);

}
  1. Use it:
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("jobName");

Here is a link that may help you too: https://ci.apache.org/projects/flink/flink-docs-stable/dev/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application