0
votes

I was writing some functions for test cases that they were been injected some shared references to an object; assume a SinkFunction like this:

class Collector[T](collection: ListBuffer[T]) extends SinkFunction[T] {
    override def invoke(in: T, context: SinkFunction.Context[_]): Unit = {
        collection.append(in)
    }
}

and test code:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = ListBuffer.empty[String]
env.fromElements("Hello").addSink(new Collector(list))
env.execute()
println(list)

I run my test, but in the end of test list was empty! I check the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html) I find out that the origin example are using a singleton-reference.

So I'm asking to be sure about how Apache Flink works internally: Does it serialize all added functions to the flow, in even in local deployments?

1

1 Answers

0
votes

Yes, Flink serializes all functions. For instance, if you take a look at SinkFunction you will notice it implements Serializable.

If you want to share data between the job and the client (the program that sends the job to Flink) you'll have to manage it yourself using files, sockets, messaging (RMQ, Kafka) or some similar mechanism.