1
votes

After Reading the official flink testing documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html) I was able to develop tests for a ProcessFunction, using a Test Harness, something like this:

pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")

testHarness =
  new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
    new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
  )

testHarness.open()

now, I’m trying to do the same for a ProcessAllWindowFunction, that looks like this:

class MapVersionValidationDistributor(batchSize: Int) extends
  ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {

lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)

First I realized I can’t use TestHarness for ProcessAllWindowFunction, because it doesn’t have a processElement method. In this case, what unit test strategy should I follow?

EDIT: At the moment my test code looks like this:

val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
    val mvv = new MapVersionValidationDistributor(1)
    val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))

    val ctx = mock[mvv.Context]
    val streamContext =  mock[RuntimeContext]
    mvv.setRuntimeContext(streamContext)

    mvv.open(mock[Configuration])
    mvv.process(ctx,input3,collector)

and I'm getting this error:

 Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }
1
You just want to test the behavior of You process method on some window right ??Dominik Wosiński
yes, that's basically what I’m trying to do :) thanks for the reply btwDiogo Araújo

1 Answers

0
votes

You don't really need test harness to unit test the process method of the ProcessAllWindowFunction. The process function takes 3 arguments: Context, Iterable[IN], Collector[OUT]. You can use some library depending on the language used to mock the Context. You can also easily implement or mock the Collector depending on your prerefences here. And the Iterable[IN] is just an Iterable containing the elements of Your window, that would be passed to the function after the window is triggered.