2
votes

I've got a Flink KeyedCoProcessFunction that registers Processing Time Timers in a larger Flink stream job, and I'm trying to create unit tests for the entire job using the Flink MiniCluster. But I can't get the onTimer() call back in the KeyedCoProcessFunction to trigger.

Has anyone gotten this to work? Did it require any special configuration?

Switching to Event Time works fine, so I'm wondering if this just doesn't work with the Flink MiniCluster or is there something wrong with my implementation.

I wrote a simple test in Scala to see if I could get this to work.

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.streaming.runtime.util.TestListResultSink
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.flink.util.Collector
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.slf4j.LoggerFactory

class TimerTest extends AnyFlatSpec with BeforeAndAfter {

  private val SlotsPerTaskMgr = 1
  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(SlotsPerTaskMgr)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "MiniCluster" should "trigger onTimer" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    implicit val longTypeInfo: TypeInformation[Long] = TypeInformation.of(classOf[Long])

    val sink = new TestListResultSink[Long]

    env.addSource(new MyLongSource(100))
      .keyBy(v => v)
      .process(new MyProccesor())
      .addSink(sink)

    env.execute()

    println("Received " + sink.getResult.size() + " output records.")
  }

}

class MyProccesor extends KeyedProcessFunction[Long, Long, Long] {

  private val log = LoggerFactory.getLogger(this.getClass)

  override def processElement(
                               value: Long,
                               ctx: KeyedProcessFunction[Long, Long, Long]#Context,
                               out: Collector[Long]): Unit = {
    log.info("Received {} at {}", value, ctx.timerService().currentProcessingTime())
    if (value % 10 == 0) {
      log.info("Scheduling processing timer for {}", ctx.timerService().currentProcessingTime() + 10)
      ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10)
    }
  }

  override def onTimer(
                        timestamp: Long,
                        ctx: KeyedProcessFunction[Long, Long, Long]#OnTimerContext,
                        out: Collector[Long]): Unit = {
    log.info("Received onTimer at {}", timestamp)
    out.collect(timestamp)
  }
}

class MyLongSource(n:Int) extends ParallelSourceFunction[Long] {
  @volatile private var stop = false

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    for(i <- 1 to n) {
      if(stop) return;
      println("Sending " + i)
      ctx.collect(i)
    }

    Thread.sleep(1000)
  }

  override def cancel(): Unit = {
    stop = true
  }
}

I was finally able to get some consistent results by adding a Thread.sleep(1000) at the end of the source run() method. Seems like once the source exits, messages get processed and then everything is shut down even if there are pending timers.

1

1 Answers

2
votes

When a Flink job shuts down, any pending processing time timers are simply ignored. They never fire.

For what it's worth, there's some ongoing discussion on the Flink dev mailing list about offering an option to trigger all pending processing time timers. See http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-FLIP-134-DataStream-Semantics-for-Bounded-Input-td37365.html#a37558.