This question was posted over a year ago but I'll post the answer in-case anyone stumbles upon this in the future.
The serialization exception you are seeing is likely this
Exception encountered when invoking run on a nested suite. *** ABORTED *** (610 milliseconds)
java.lang.NullPointerException:
at java.util.Objects.requireNonNull(Objects.java:203)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.<init>(StreamElementSerializer.java:64)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:136)
at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:77)
at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70)
at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:366)
at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:165)
...
The reason is that your test operator needs to know how to deserialize the DataStream input type. The only way to provide this is by supplying it directly while initializing the testHarness and then passing it to the setup() method call.
So to test the example from the Flink docs you linked you can do something like this (my implementation is in Scala but you can adapt it to Java as well)
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
import org.apache.flink.streaming.runtime.tasks.{StreamTaskActionExecutor, TestProcessingTimeService}
import org.apache.flink.streaming.runtime.tasks.mailbox.{MailboxExecutorImpl, TaskMailboxImpl}
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
/**
This test case is written using Flink 1.11+.
Older versions likely have a simpler constructor definition for [[AsyncWaitOperator]] so you might have to remove the last two arguments (processingTimeService and mailboxExecutor)
*/
class AsyncDatabaseRequestSuite extends FunSuite with BeforeAndAfter with Matchers {
var testHarness: OneInputStreamOperatorTestHarness[String, (String, String)] = _
val TIMEOUT = 1000
val CAPACITY = 1000
val MAILBOX_PRIORITY = 0
def createTestHarness: Unit = {
val operator = new AsyncWaitOperator[String, (String, String)](
new AsyncDatabaseRequest {
override def open(configuration: Configuration): Unit = {
client = new MockDatabaseClient(host, post, credentials); // put your mock DatabaseClient object here
}
},
TIMEOUT,
CAPACITY,
OutputMode.UNORDERED,
new TestProcessingTimeService,
new MailboxExecutorImpl(
new TaskMailboxImpl,
MAILBOX_PRIORITY,
StreamTaskActionExecutor.IMMEDIATE
)
)
// supply the TypeSerializer for the "input" type of the operator
testHarness = new OneInputStreamOperatorTestHarness[String, (String, String)](
operator,
TypeExtractor.getForClass(classOf[String]).createSerializer(new ExecutionConfig)
)
// supply the TypeSerializer for the "output" type of the operator to the setup() call
testHarness.setup(
TypeExtractor.getForClass(classOf[(String, String)]).createSerializer(new ExecutionConfig)
)
testHarness.open()
}
before {
createTestHarness
}
after {
testHarness.close()
}
test("Your test case goes here") {
// fill in your test case here
}
}