I have a Flink job which reads from Kafka (v0.9) and writes to Redis. I want to monitor the records-consumed-rate
and records-lag-max
metrics emitted by Kafka which Flink should be able to forward. In this case, I am forwarding to Datadog.
When I start the job with a parallelism of 1, I see this metric emitted just fine. However, if I make the parallelism greater than 1, this metric is no longer forwarded. The job is running when parallelism > 1 because I can see the entries being written to Redis.
I'm running Flink (v1.6.2) on AWS EMR:
- master node: (1) m4.large
- core node: (1) c4.2xlarge
- num.task.managers: 1
- slots.per.task.manager: 7
- parallelism: 7
The parallelism is set by streamExecutionEnvironment.setParallelism(). Each Kafka Consumer is instantiated with the same group.id and a unique client.id.
The DD agent is running just fine on the cluster. Many metrics are being emitted such as numberOfCompletedCheckpoints and upTime etc.
Is there any reason Flink would not be forwarding these metrics from Kafka if the parallelism is greater than 1?
Update:
I also tried sending a custom DD metric (counter.inc()
) from the Redis RichSinkFunction. When the parallelism=1, the metric is sent fine. When parallelism=7, the metric is not sent however it is being called (added a debug line). So it seems its not limited to the forwarded metrics from Kafka.