I run my Flink job on YARN and I find a small number of subtasks encounter a long alignment duration.
What may probably cause this problem?
For exactly-once semantics, Flink aligns the streams at operators that receive multiple input streams, hence large alignment means the task manager receives some barrier(s) later than the other nodes.
Document about alignment can be found here, and there are ways to monitor checkpointing
To be more specific, the reasons may be: