0
votes

When I am trying to run a TFX pipeline/Apache Beam job on a Flink runner, it works fine when using 1 task manager (on one node) with parallelism 2 (2 task slots per task manager). But hangs when I try it with higher parallelism on more than one task manager with the message constantly repeating on both task managers:

INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory [] - Still waiting for startup of environment from a65a0c5f8f962428897aac40763e57b0-1334930809.eu-central-1.elb.amazonaws.com:50000 for worker id 1-1

The Flink cluster runs on a native Kubernetes deployment on an AWS EKS Kubernetes Cluster.

I use the following parameters:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        f"--environment_config={beam_sdk_url}:50000",
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",
    

EDIT: Adding additional info about the configuratio

I have configured the Beam workers to run as side-cars (at least this is my understanding of how it should work), by setting the Flink parameter:

kubernetes.pod-template-file.taskmanager

it is pointing out to a template file with contents:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
     #hostNetwork: true
     containers:
      - name: flink-main-container
        #image: apache/flink:scala_2.12
        env:
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
          - name: PYTHONPATH
            value: "/data/flink/src"
        args: ["taskmanager"]
        ports:
        - containerPort: 6122 #22
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122 #22
          initialDelaySeconds: 30
          periodSeconds: 60
      - name: beam-worker-pool
        env:
          - name: PYTHONPATH
            value: "/data/flink/src"
          - name: AWS_REGION
            value: "eu-central-1"
          - name: S3_VERIFY_SSL
            value: "0"
        image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
        imagePullPolicy: Always
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60

I have also created a kubernetes load balancer for the task managers, so clients can connect on port 50000. So I use that address when configuring:

f"--environment_config={beam_sdk_url}:50000",

EDIT 2: Looks like the Beam SDK harness on one task manager wants to connect to the endpoint running on the other task manager, but looks for it on localhost:

Log from beam-worker-pool on TM 2:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705
    caused by:
context deadline exceeded

The provision endpoint on TM 1 is the one actually listening on the port 33705, while this is looking for it on localhost, so cannot connect to it.

EDIT 3: Showing how I test this:

...............

TM 1:
========
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983
    caused by:
context deadline exceeded

TM 2:
=========
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907
    caused by:
context deadline exceeded

Testing:
.........................

TM 1:
============
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

Not sure how to fix this.

Thanks, Gorjan

2
Just to clarify, when you run with 1 task manager you're still connecting to the same endpoint for your external environment, right? It sounds like there's some kind of conflict caused by both task managers trying to create an external environment at that endpoint (source code) but I'm not sure if that's due to a known limitation, a bug, or some sort of user error.Daniel Oliveira
Yes, @DanielOliveira -running with one task manager I use the same endpoint. I have edited my question with a bit more details about the configuration.Gorjan Todorovski

2 Answers

1
votes

It's not recommended to try to connect to the same environment with different task managers. Usually we recommend setting up the Beam workers as side cars to the task managers so there's a 1:1 correspondence, then connecting via localhost. See the example config at https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml and https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_wordcount_py.yaml

0
votes

I was able to fix this by setting the Beam SDK address to localhost instead of using a load balancer. So the config I use now is:

        "--runner=FlinkRunner",
        "--parallelism=4",
        f"--flink_master={flink_url}:8081",
        "--environment_type=EXTERNAL",
        "--environment_config=localhost:50000", # <--- Changed the address to localhost
        "--flink_submit_uber_jar",
        "--worker_harness_container_image=none",