10
votes

What ways do we have available to connect to a Google Cloud SQL (MySQL) instance from the newly introduced Google Cloud Composer? The intention is to get data from a Cloud SQL instance into BigQuery (perhaps with an intermediary step through Cloud Storage).

  1. Can the Cloud SQL proxy be exposed in some way on pods part the Kubernetes cluster hosting Composer?

  2. If not can the Cloud SQL Proxy be brought in by using the Kubernetes Service Broker? -> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. Should Airflow be used to schedule and call GCP API commands like 1) export mysql table to cloud storage 2) read mysql export into bigquery?

  4. Perhaps there are other methods that I am missing to get this done

6
Raj did you finally find a way ? I'm also trying to do soThibault Clement

6 Answers

5
votes

"The Cloud SQL Proxy provides secure access to your Cloud SQL Second Generation instances without having to whitelist IP addresses or configure SSL." -Google CloudSQL-Proxy Docs

CloudSQL Proxy seems to be the recommended way to connect to CloudSQL above all others. So in Composer, as of release 1.6.1, we can create a new Kubernetes Pod to run the gcr.io/cloudsql-docker/gce-proxy:latest image, expose it through a service, then create a Connection in Composer to use in the operator.

To get set up:

  • Follow Google's documentation

  • Test the connection using info from Arik's Medium Post

    • Check that the pod was created kubectl get pods --all-namespaces

    • Check that the service was created kubectl get services --all-namespaces

    • Jump into a worker node kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • Test mysql connection mysql -u composer -p --host <service-name>.default.svc.cluster.local

Notes:

  • Composer now uses namespaces to organize pods

  • Pods in different namespaces don't talk to each other unless you give them the full path <k8-service-name>.<k8-namespace-name>.svc.cluster.local

  • Creating a new Composer Connection with the full path will enable successful connection

4
votes

We had the same problem but with a Postgres instance. This is what we did, and got it to work:

  • create a sqlproxy deployment in the Kubernetes cluster where airflow runs. This was a copy of the existing airflow-sqlproxy used by the default airflow_db connection with the following changes to the deployment file:

    • replace all instances of airflow-sqlproxy with the new proxy name
    • edit under 'spec: template: spec: containers: command: -instances', replace the existing instance name with the new instance we want to connect to
  • create a kubernetes service, again as a copy of the existing airflow-sqlproxy-service with the following changes:

    • replace all instances of airflow-sqlproxy with the new proxy name
    • under 'spec: ports', change to the appropriate port (we used 5432 for a Postgres instance)
  • in the airflow UI, add a connection of type Postgres with host set to the newly created service name.

2
votes

You can follow these instructions to launch a new Cloud SQL proxy instance in the cluster.

re #3: That sounds like a good plan. There isn't a Cloud SQL to BigQuery operator to my knowledge, so you'd have to do it in two phases like you described.

1
votes

Adding the medium post in the comments from @Leo to the top level https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53 . Once you follow that article and have the service setup you can connect from your DAG using SQLAlchemy like this:

import os
from datetime import datetime, timedelta
import logging

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"

default_args = {
    'start_date': datetime(2019, 7, 16)
}


def connect_to_cloud_sql():
    '''
        Create a connection to CloudSQL
    :return:
    '''
    import sqlalchemy
    try:
        PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
        logger.info("DB URL", PROXY_DB_URL)
        engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
        for result in engine.execute("SELECT NOW() as now"):
            logger.info(dict(result))
    except Exception:
        logger.exception("Unable to interact with CloudSQL")


dag = DAG(
    dag_id="example_sqlalchemy",
    default_args=default_args,
    # schedule_interval=timedelta(minutes=5),
    catchup=False  # If you don't set this then the dag will run according to start date
)


t1 = PythonOperator(
    task_id="example_sqlalchemy",
    python_callable=connect_to_cloud_sql,
    dag=dag
)


if __name__ == "__main__":
    connect_to_cloud_sql()
0
votes

Here, in Hoffa's answer to a similar question, you can find a reference on how Wepay keeps it synchronized every 15 minutes using an Airflow operator.

From said answer:

Take a look at how WePay does this:

The MySQL to GCS operator executes a SELECT query against a MySQL table. The SELECT pulls all data greater than (or equal to) the last high watermark. The high watermark is either the primary key of the table (if the table is append-only), or a modification timestamp column (if the table receives updates). Again, the SELECT statement also goes back a bit in time (or rows) to catch potentially dropped rows from the last query (due to the issues mentioned above).

With Airflow they manage to keep BigQuery synchronized to their MySQL database every 15 minutes.

0
votes

Now we can connect to Cloud SQL without creating a cloud proxy ourselves. The operator will create it automatically. The code look like this:

from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator

export_body = {
    'exportContext': {
        'fileType': 'CSV',
        'uri': EXPORT_URI,
        'databases': [DB_NAME],
        'csvExportOptions': {
            'selectQuery': SQL
        }
    }
}

default_dag_args = {}

with DAG(
        'postgres_test',
        schedule_interval='@once',
        default_args=default_dag_args) as dag:

    sql_export_task = CloudSqlInstanceExportOperator(
        project_id=GCP_PROJECT_ID,
        body=export_body,
        instance=INSTANCE_NAME,
        task_id='sql_export_task'
    )