1
votes

I want to use a non serializable object in stream.map() like this

stream.map { i =>
  val obj = new SomeUnserializableClass()
  obj.doSomething(i)
}

It is very inefficient, because I create many SomeUnserializableClass instance. Actually, it can be created only once in each worker.

In Spark, I can use mapPartition to do this. But in flink stream api, I don't known.

1

1 Answers

3
votes

If you are dealing with a non serializable class what I recommend you is to create a RichFunction. In your case a RichMapFunction.

A Rich operator in Flink has a open method that is executed in the taskmanager just one time as initializer.

So the trick is to make your field transient and instantiate it in your open method.

Check below example:

public class NonSerializableFieldMapFunction extends RichMapFunction {

    transient SomeUnserializableClass someUnserializableClass;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.someUnserializableClass = new SomeUnserializableClass();
    }

    @Override
    public Object map(Object o) throws Exception {
        return someUnserializableClass.doSomething(o);
    }
}

Then your code will looks like:

stream.map(new NonSerializableFieldMapFunction())

P.D: I'm using java syntax, please adapt it to scala.