1
votes

I have an Apache Beam Pipeline that I am trying to deploy on a Flink Docker Cluster deployed locally.

The pipeline fails with

The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.<init>(FlinkExecutionEnvironments.java:319)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:177)
org.apache.beam.runners.flink.FlinkExecutionEnvironments.createStreamExecutionEnvironment(FlinkExecutionEnvironments.java:139)
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:98)
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:108)
ApacheBeamPocJava.main(ApacheBeamPocJava.java:262)

This is how I am setting up the pipeline

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setRunner(FlinkRunner.class);
options.setFlinkMaster(“localhost:6123”);
options.setFilesToStage(Arrays.asList("path to the beam jar"));
FlinkRunner flinkRunner = FlinkRunner.fromOptions(options); 
Pipeline p= Pipeline.create(options);

And after defining the steps of the pipeline. I run it like this

flinkRunner.run(p);

This is how I submit the job

flink run -c ClassName PATH_TO_JAR

Can someone advise what is going wrong here?

Also if someone has a Beam <-> Flink examples handy for Java. I would definitely appreciate that too.

1

1 Answers

0
votes

It seems that you have defined the running environment inside the pipeline itself. Have you tried launching your pipeline like described in the Flink runner documentation? (Remove the parts of your code where you are defining a runner, or configuring it.)

As Beam is a framework that decouples your code from the runner that is executing it, it's not necessary to have the Flink runner configuration in your pipeline code itself. If you can execute your pipeline locally with the direct runner, it will also work on the Flink runner (or any other one that is supported) when being compiled with the right profile.

bin/flink run -c org.apache.beam.examples.WordCount /path/to/your.jar –runner=FlinkRunner –other-parameters-for-your-pipeline-or-the-runner

Please be aware that there is currently a bug in Beam 2.25.0 for the Flink runner, so try it with version 2.24.0, or a later version when it's released.