I'm trying to run a Python BEAM job on Flink Runner. I've started a minikube cluster on my local machine and port forwarded 8081 to view the Flink Dashboard on localhost:8081. Everything on the deployed flink cluster seems to be running but when I try to execute a basic pipeline (that runs fine on DirectRunner) I get this stack trace error on the Flink completed jobs dashboard:
Job failed during initialization of JobManager
org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
My beam pipeline looks like this
from apache_beam.options.pipeline_options import PipelineOptions
def run():
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=localhost:8081",
# "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as p:
lines = (
p
| 'create' >> beam.Create([
'This is a test',
'This is another line',
'Oh look another'
])
| beam.Map(print)
)
if __name__ == "__main__":
run()
Can't seem to find a way around this problem. The Flink version is v1.12.0.