After Reading the official flink testing documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html) I was able to develop tests for a ProcessFunction, using a Test Harness, something like this:
pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")
testHarness =
new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
)
testHarness.open()
now, I’m trying to do the same for a ProcessAllWindowFunction, that looks like this:
class MapVersionValidationDistributor(batchSize: Int) extends
ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {
lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)
First I realized I can’t use TestHarness for ProcessAllWindowFunction, because it doesn’t have a processElement method. In this case, what unit test strategy should I follow?
EDIT: At the moment my test code looks like this:
val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
val mvv = new MapVersionValidationDistributor(1)
val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))
val ctx = mock[mvv.Context]
val streamContext = mock[RuntimeContext]
mvv.setRuntimeContext(streamContext)
mvv.open(mock[Configuration])
mvv.process(ctx,input3,collector)
and I'm getting this error:
Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }