When I am trying to run a TFX pipeline/Apache Beam job on a Flink runner, it works fine when using 1 task manager (on one node) with parallelism 2 (2 task slots per task manager). But hangs when I try it with higher parallelism on more than one task manager with the message constantly repeating on both task managers:
INFO org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory [] - Still waiting for startup of environment from a65a0c5f8f962428897aac40763e57b0-1334930809.eu-central-1.elb.amazonaws.com:50000 for worker id 1-1
The Flink cluster runs on a native Kubernetes deployment on an AWS EKS Kubernetes Cluster.
I use the following parameters:
"--runner=FlinkRunner",
"--parallelism=4",
f"--flink_master={flink_url}:8081",
"--environment_type=EXTERNAL",
f"--environment_config={beam_sdk_url}:50000",
"--flink_submit_uber_jar",
"--worker_harness_container_image=none",
EDIT: Adding additional info about the configuratio
I have configured the Beam workers to run as side-cars (at least this is my understanding of how it should work), by setting the Flink parameter:
kubernetes.pod-template-file.taskmanager
it is pointing out to a template file with contents:
kind: Pod
metadata:
name: taskmanager-pod-template
spec:
#hostNetwork: true
containers:
- name: flink-main-container
#image: apache/flink:scala_2.12
env:
- name: AWS_REGION
value: "eu-central-1"
- name: S3_VERIFY_SSL
value: "0"
- name: PYTHONPATH
value: "/data/flink/src"
args: ["taskmanager"]
ports:
- containerPort: 6122 #22
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122 #22
initialDelaySeconds: 30
periodSeconds: 60
- name: beam-worker-pool
env:
- name: PYTHONPATH
value: "/data/flink/src"
- name: AWS_REGION
value: "eu-central-1"
- name: S3_VERIFY_SSL
value: "0"
image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
imagePullPolicy: Always
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
I have also created a kubernetes load balancer for the task managers, so clients can connect on port 50000. So I use that address when configuring:
f"--environment_config={beam_sdk_url}:50000",
EDIT 2: Looks like the Beam SDK harness on one task manager wants to connect to the endpoint running on the other task manager, but looks for it on localhost:
Log from beam-worker-pool on TM 2:
2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial server at localhost:33705
caused by:
context deadline exceeded
The provision endpoint on TM 1 is the one actually listening on the port 33705, while this is looking for it on localhost, so cannot connect to it.
EDIT 3: Showing how I test this:
...............
TM 1:
========
$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial server at localhost:40983
caused by:
context deadline exceeded
TM 2:
=========
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed to dial server at localhost:32907
caused by:
context deadline exceeded
Testing:
.........................
TM 1:
============
$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...
TM 2:
=============
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output
Not sure how to fix this.
Thanks, Gorjan