4
votes

I met some issues about developing flink jobs recently, which introduced spring and hibernate, and the job would be run on flink cluster. So I need initialize spring resource before runing flink operators on task manager instead of job manager.But I can not find any suitable method of StreamExecutionEnvironment to do that.

I have tried some approaches like this below:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
// etl business logic as flink operators
FlinkOperators.run();  
env.execute();

However, when the flink job whose parallelism is more than one executes,the spring initialization will NOT be in each task manager process. So I can not use spring in flink job.

Is there any approach to initialize spring resource on flink job?

Thanks.

Best Regards.

Alvin

1
I had the problem right now, did you find the solution ?zt1983811

1 Answers

0
votes

Everytime you need to have some kind of context initialization for each task manager, build a static function(a function inside an object if you use scala) that stores those initialized values inside a static variable.

That should be enough, because static values are stored in the memory of each task manager.

I use this aproach to load properties files in each task manager, those propertie files contains per job configurations. If you are loading files, check that each taskmanager has a copy of the file that you want to load.