3
votes

I want to connect google cloud sql postgres instance from apache beam pipeline running on google dataflow.
I want to do this using Python SDK.
I am not able to find proper documentation for this.
In cloud SQL how to guide I dont see any documentation for dataflow.
https://cloud.google.com/sql/docs/postgres/

Can someone provide documentation link/github example?

2

2 Answers

5
votes

You can use the relational_db.Write and relational_db.Read transforms from beam-nuggets as follows:

First install beam-nuggests:

pip install beam-nuggets

For reading:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
    )
    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name='months',
    )
    records | 'Writing to stdout' >> beam.Map(print)

For writing:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p:
    months = p | "Reading month records" >> beam.Create([
        {'name': 'Jan', 'num': 1},
        {'name': 'Feb', 'num': 2},
    ])
    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='localhost',
        port=5432,
        username='postgres',
        password='password',
        database='calendar',
        create_if_missing=True,
    )
    table_config = relational_db.TableConfiguration(
        name='months',
        create_if_missing=True
    )
    months | 'Writing to DB' >> relational_db.Write(
        source_config=source_config,
        table_config=table_config
    )
1
votes

The Java SDK contains JdbcIO that allows connecting to any database that can be accessed via the standard Java JDBC mechanism. There is currently no analogue in the Beam Python SDK. If there was, I suppose it would be using the Python DB-API. Feel free to file a feature request or contribute - it should be fairly straightforward to develop (e.g. by mimicking the source code of the Java JdbcIO) but very useful :)