0
votes

I'm trying to run a Python BEAM job on Flink Runner. I've started a minikube cluster on my local machine and port forwarded 8081 to view the Flink Dashboard on localhost:8081. Everything on the deployed flink cluster seems to be running but when I try to execute a basic pipeline (that runs fine on DirectRunner) I get this stack trace error on the Flink completed jobs dashboard:

Job failed during initialization of JobManager
org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException

My beam pipeline looks like this

    from apache_beam.options.pipeline_options import PipelineOptions
    
    def run():
        options = PipelineOptions([
            "--runner=FlinkRunner",
            "--flink_master=localhost:8081",
            # "--environment_type=LOOPBACK"
        ])
    
        with beam.Pipeline(options=options) as p:
            lines = (
                p
                | 'create' >> beam.Create([
                    'This is a test',
                    'This is another line',
                    'Oh look another'
                ])
                | beam.Map(print)
            )
    
    if __name__ == "__main__":
        run()

Can't seem to find a way around this problem. The Flink version is v1.12.0.

1

1 Answers

0
votes

I fixed the problem, I didn't have worker pools with the required python3.7 SDK

- name: beam-worker-pool
    image: apache/beam_python3.7_sdk
    args: ["--worker_pool"]
    ports:
    - containerPort: 50000
      name: pool
    livenessProbe:
      tcpSocket:
        port: 50000
      initialDelaySeconds: 30
      periodSeconds: 60

This sidecar needs to be added to the task manager kubernetes deploy. The pipeline options had to be changed to

options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.10",
        "--flink_master=localhost:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=localhost:50000"
    ])

Using --environment_config from BEAM's SDK harnessing did the trick. I tried Flink 1.12 with no luck, I kept it at 1.10.