4
votes

I currently have Flink setup and have a Job running on EMR and I'm now trying to add monitoring by sending metrics off to prometheus.

I have come across an issue with running Flink on EMR. I'm using Terraform to provision EMR (I run ansible after to download and run a job). Out the box, it does not look like EMR's Flink distribution includes the optional jars (flink-metrics-prometheus, flink-cep, etc).

Looking at Flink's documentation, it says

"In order to use this reporter you must copy /opt/flink-metrics-prometheus-1.6.1.jar into the /lib folder of your Flink distribution" https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

But when logging into the EMR master node, neither /etc/flink or /usr/lib/flink has a directory called opts and i can not see flink-metrics-prometheus-1.6.1.jar anywhere.

I know Flink has other optional libs you'd usually have to copy if you want to use them such as flink-cep, but I'm not sure how to do this when using EMR.

This is the exception i get, which I beleive is because it can not find the metrics jar in its classpath.

java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.flink.runtime.metrics.MetricRegistryImpl.<init>(MetricRegistryImpl.java:144)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:419)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:276)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:227)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:191)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
    at org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint.main(YarnSessionClusterEntrypoint.java:137)

EMR resource in terraform

  resource "aws_emr_cluster" "emr_flink" {
  name          = "ce-emr-flink-arn"
  release_label = "emr-5.20.0" # 5.21.0 is not found, could be a region thing
  applications  = ["Flink"]

  ec2_attributes {
    key_name                          = "ce_test"
    subnet_id                         = "${aws_subnet.ce_test_subnet_public.id}"
    instance_profile                  = "${aws_iam_instance_profile.emr_profile.arn}"
    emr_managed_master_security_group = "${aws_security_group.allow_all_vpc.id}"
    emr_managed_slave_security_group  = "${aws_security_group.allow_all_vpc.id}"
    additional_master_security_groups  = "${aws_security_group.external_connectivity.id}"
    additional_slave_security_groups  = "${aws_security_group.external_connectivity.id}"
  }

  ebs_root_volume_size = 100
  master_instance_type = "m4.xlarge"
  core_instance_type   = "m4.xlarge"
  core_instance_count  = 2

  service_role = "${aws_iam_role.iam_emr_service_role.arn}"

  configurations_json = <<EOF
[
  {
    "Classification": "flink-conf",
    "Properties": {
        "parallelism.default": "8",
        "state.backend": "RocksDB",
        "state.backend.async": "true",
        "state.backend.incremental": "true",
        "state.savepoints.dir": "file:///savepoints",
        "state.checkpoints.dir": "file:///checkpoints",
        "web.submit.enable": "true",
        "metrics.reporter.promgateway.class": "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter",
        "metrics.reporter.promgateway.host": "${aws_instance.monitoring.private_ip}",
        "metrics.reporter.promgateway.port": "9091",
        "metrics.reporter.promgateway.jobName": "ce-test",
        "metrics.reporter.promgateway.randomJobNameSuffix": "true",
        "metrics.reporter.promgateway.deleteOnShutdown": "false"
    }
  }
]
EOF
}

I suspect i may have to download the Jar in the bootstrap stage, but wanted to check this first, and see if there's any examples of this being done

2

2 Answers

2
votes

I haven't used Terraform, but note that you typically need to provision (set up jars) on both the master and the slaves in EMR. One way to figure out where EMR thinks jars should go is to log onto a slave when a job is running, do ps auxwww | grep java, find the TaskManager process, look at the jars added to the classpath when it launched, and find where those are located on the server. Or at least that worked for me in the past.

1
votes

I've select the EMR release emr-5.24.0 and I monitoring with the influxdb .jar with suceed.

I've copy the .jar file to /usr/lib/flink/lib folder and restart the Flink cluster with the following bash command (with sudo permission).

/usr/lib/flink/bin/stop-cluster.sh && /usr/lib/flink/bin/stop-cluster.sh

I assume that you can solve your question with the same steps for prometheus

[ec2-user@ip-10-0-11-17 ~]$ cd /usr/lib/flink/opt/flink-metrics-
flink-metrics-datadog-1.8.0.jar     flink-metrics-influxdb-1.8.0.jar    flink-metrics-slf4j-1.8.0.jar
flink-metrics-graphite-1.8.0.jar    flink-metrics-prometheus-1.8.0.jar  flink-metrics-statsd-1.8.0.jar


[ec2-user@ip-10-0-11-17 ~]$ ll /usr/lib/flink/opt/flink-metrics-prometheus-1.8.0.jar
-rw-r--r-- 1 root root 101984 may 14 19:21 /usr/lib/flink/opt/flink-metrics-prometheus-1.8.0.jar


[ec2-user@ip-10-0-11-17 ~]$ uname -a
Linux ip-10-0-11-17 4.14.114-83.126.amzn1.x86_64 #1 SMP Tue May 7 02:26:58 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux