I was writing some functions for test cases that they were been injected some shared references to an object; assume a SinkFunction
like this:
class Collector[T](collection: ListBuffer[T]) extends SinkFunction[T] {
override def invoke(in: T, context: SinkFunction.Context[_]): Unit = {
collection.append(in)
}
}
and test code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = ListBuffer.empty[String]
env.fromElements("Hello").addSink(new Collector(list))
env.execute()
println(list)
I run my test, but in the end of test list
was empty!
I check the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) I find out that the origin example are using a singleton-reference.
So I'm asking to be sure about how Apache Flink works internally: Does it serialize all added functions to the flow, in even in local deployments?