I have an operator with parallelism=256 running on 128 task managers. Each time when I get a checkpoint failure, it happens at the same subtask of this operator, for example it's always subtask 129 that gets stuck and blocks the checkpointing. I want to understand what happened to this subtask by examining logs of the task manager that subtask 129 is running on. Is there a way in Flink to map subtask id to the corresponding Task Manager?
2
votes
2 Answers
2
votes
0
votes
I was able to find not a trivial, but working solution to get the required map at runtime programmatically.
The main idea is that the Rest Endpoint /jobs/:jobid/vertices/:vertexid provides the necessary information for a specific vertex in format
{
"id": "804e...",
"name": "Map -> Sink",
...
"subtasks": [
{
"subtask": 0,
"host": "ip-10-xx-yy-zz:36ddd"
},
...
]
}
The main difficulty was to get the web interface url programmatically. I was able to get it this way (probably, there is a more elegant solution):
val env = FieldUtils
.readField(getRuntimeContext.asInstanceOf[StreamingRuntimeContext], "taskEnvironment", true)
.asInstanceOf[RuntimeEnvironment]
try {
println("trying to get cluster client...")
val client = new RestClusterClient[String](env.getTaskManagerInfo.getConfiguration, "rest")
return client.getWebInterfaceURL
} catch {
case e: Exception =>
println("Failed to get cluster client : ")
e.printStackTrace()
}
Given the web interface url, I simply made an http call to it and constructed the map.