1
votes

Huge thanks to the Google team for some great cloud products. Was hoping someone could point out what I'm missing in the implementation below.

I'm presently orchestrating a TFX pipeline via Airflow, and by extension - Cloud Composer. You can have a look at a condensed version of the code below:

# other code setting up unrelated variables above ...
metadata_config = metadata.mysql_metadata_connection_config(host=DATABASE_IP, port=3306,
                                                            database=DATABASE_NAME, username=USERNAME,
                                                            password=PASSWORD)

def create_pipeline(pipeline_name, pipeline_root, data_root, transform_module, train_module, serving_root,
                    beam_pipeline_args, metadata_config=None):
    """create beam pipeline"""
    example_gen = CsvExampleGen(input_base=data_root)

    # unrelated tfx code ....
    
    return pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=[
            example_gen,
            # ...other components ...
        ],
        enable_cache=True,
        beam_pipeline_args=beam_pipeline_args,
        metadata_connection_config=metadata_config
    )

# var for airflow to detect DAG
DAG = AirflowDagRunner(AirflowPipelineConfig(airflow_config)).run(
    create_pipeline(pipeline_name=beam_pipeline_name, pipeline_root=beam_pipeline_root, data_root=data_root,
                    transform_module=transform_module, train_module=train_module, serving_root=serving_root,
                    metadata_config=metadata_config,
                    beam_pipeline_args=local_pipeline_args)
)

The pipeline runs fine on my local machine, but fails upon trying to access the metadata store in Cloud SQL. The key is that it fails upon trying to execute the CsvExampleGenComponent while trying to read/write metadata via metadata_config.

enter image description here

Accordingly, I proceeded to read the following documentation from Google on how to configure a connection between Cloud Composer & Cloud SQL:

Connecting From Google Kubernetes Engine https://cloud.google.com/sql/docs/mysql/connect-kubernetes-engine

Managing Airflow Connections In Cloud Composer https://cloud.google.com/composer/docs/how-to/managing/connections

As directed by the first guide, I created a yaml file for a Cloud SQL proxy:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: cloudsql-proxy
spec:
  selector:
    matchLabels:
      app: cloudsql-proxy
  template:
    metadata:
      labels:
        app: cloudsql-proxy
    spec:
      containers:
      - name: cloudsql-proxy
        #image: gcr.io/cloudsql-docker/gce-proxy:1.17
        # ... other container configuration
        env:
        - name: DB_USER
          valueFrom:
            secretKeyRef:
              name: cloudsql-token
              key: username
        - name: DB_PASS
          valueFrom:
            secretKeyRef:
              name: cloudsql-token
              key: password
        - name: DB_NAME
          valueFrom:
            secretKeyRef:
              name: cloudsql-token
              key: database
        - name: "PORT"
          value: "50001"
      #- name: cloud-sql-proxy
        # It is recommended to use the latest version of the Cloud SQL proxy
        # Make sure to update on a regular schedule!
        image: gcr.io/cloudsql-docker/gce-proxy:1.17
        command:
          - "/cloud_sql_proxy"

          # If connecting from a VPC-native GKE cluster, you can use the
          # following flag to have the proxy connect over private IP
          # - "-ip_address_types=PRIVATE"

          # Replace DB_PORT with the port the proxy should listen on
          # Defaults: MySQL: 3306, Postgres: 5432, SQLServer: 1433
          - "-instances=vm-intro-285512:us-central1:recommender-metadata=tcp:3306"

        # [START cloud_sql_proxy_k8s_volume_mount]
          # This flag specifies where the service account key can be found
          - "-credential_file=/Users/michaelma/.gcp/credentials/credentials.json"
        securityContext:
          # The default Cloud SQL proxy image runs as the
          # "nonroot" user and group (uid: 65532) by default.
          runAsNonRoot: true
        volumeMounts:
        - name: service-account-token
          mountPath: /Users/michaelma/.gcp/credentials
          readOnly: true
          # [END cloud_sql_proxy_k8s_volume_mount]
      # [START cloud_sql_proxy_k8s_volume_secret]
      volumes:
      - name: service-account-token
        secret:
          secretName: service-account-token
      # [START cloud_sql_proxy_k8s_volume_secret]

Something to note here is that I made a slight alteration to the template provided here by Google - as that version contained two containers: one for a user application and another for the proxy. Since Cloud Composer is managing my application via Airflow, I removed the first container and am purely launching the proxy container. Could this be the issue?

As directed by the second guide, I also created a yaml file for the service to expose the above proxy:

apiVersion: v1
kind: Service
metadata:
  name: cloudsql-proxy-service
spec:
  type: LoadBalancer
  selector:
    app: cloudsql-proxy
  ports:
  - protocol: TCP
    port: 60000
    targetPort: 50001

I then proceeded to create a Cloud Composer environment with the default settings and added the following PyPi packages via the cloud console:

numpy==1.16.0
tfx==0.25.0
tensorflow-model-analysis==0.25.0

I then moved my DAG and data files into their appropriate folders in the composer environment's designated bucket, launched the CloudSQL proxy and the service exposing it via kubectl (with all other configuration instructions specified by the guides above), and triggered the DAG via the airflow UI.

I was greeted by the same delightful error.

It seems that despite the CloudSQL proxy pod and service, the DAG still can't access the CloudSQL instance.

One other thing I tried was adding the external IP of the proxy to the list of IPs the CloudSQL instance could connect to -> this produced no changes. It's also worth noting the service account associated with the project has Editor privileges.

Anyone got a clue here? I suspect it may be something really simply in my YAML files or CloudSQL configuration ...

1

1 Answers

0
votes

1.You have to make sure that the cloudsql-proxy deployment and service are deployed in the same namespace as the airflow-worker, that is namespace created by google composer. Get the namespace with this command (make sure you have connected to the google composer's kubernetes cluster):

kubectl get namespaces | grep composer | cut -d ' ' -f1 
// e.g. composer-1-12-4-airflow-1-10-10-xxxxxxx

You can add namespace to the yaml files, both for deployment and service under metadata.

...
kind: Deployment
metadata:
  name: cloudsql-proxy
  namespace: composer-1-12-4-airflow-1-10-10-xxxxxxx
...
...
kind: Service
metadata:
  name: cloudsql-proxy-service
  namespace: composer-1-12-4-airflow-1-10-10-xxxxxxx
...

2.When you create the connection, use the service name as the host:

metadata_config = metadata.mysql_metadata_connection_config(host="cloudsql-proxy-service", ...)