2
votes

I have an operator with parallelism=256 running on 128 task managers. Each time when I get a checkpoint failure, it happens at the same subtask of this operator, for example it's always subtask 129 that gets stuck and blocks the checkpointing. I want to understand what happened to this subtask by examining logs of the task manager that subtask 129 is running on. Is there a way in Flink to map subtask id to the corresponding Task Manager?

2

2 Answers

2
votes

The taskmanager.log files contain the names of the deployed tasks including their sub task index. You could simply search for the TASK_NAME (129/256) in all taskmanager.log files.

0
votes

I was able to find not a trivial, but working solution to get the required map at runtime programmatically.

The main idea is that the Rest Endpoint /jobs/:jobid/vertices/:vertexid provides the necessary information for a specific vertex in format

{
  "id": "804e...",
  "name": "Map -> Sink",
  ...
  "subtasks": [
    {
      "subtask": 0,
      "host": "ip-10-xx-yy-zz:36ddd"
    },
    ...
   ]
}

The main difficulty was to get the web interface url programmatically. I was able to get it this way (probably, there is a more elegant solution):

val env = FieldUtils
      .readField(getRuntimeContext.asInstanceOf[StreamingRuntimeContext], "taskEnvironment", true)
      .asInstanceOf[RuntimeEnvironment]

    try {
      println("trying to get cluster client...")
      val client = new RestClusterClient[String](env.getTaskManagerInfo.getConfiguration, "rest")
      return client.getWebInterfaceURL

    } catch {
      case e: Exception =>
        println("Failed to get cluster client : ")
        e.printStackTrace()
    }

Given the web interface url, I simply made an http call to it and constructed the map.