I wanted to read the pubsub topic and write data to BigTable with the dataflow code written in Python. I could find the sample code in JAVA but not in Python. How can we assign columns in a row from pubsub to different column families and write the data to Bigtable?
2
votes
2 Answers
4
votes
To write to Bigtable in a Dataflow pipeline, you'll need to create direct rows and pass them to the WriteToBigTable
doFn. Here is a brief example that just passes in the row keys and adds one cell for each key nothing too fancy:
import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud.bigtable import row
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--bigtable-project',
help='The Bigtable project ID, this can be different than your '
'Dataflow project',
default='bigtable-project')
parser.add_argument(
'--bigtable-instance',
help='The Bigtable instance ID',
default='bigtable-instance')
parser.add_argument(
'--bigtable-table',
help='The Bigtable table ID in the instance.',
default='bigtable-table')
class CreateRowFn(beam.DoFn):
def process(self, key):
direct_row = row.DirectRow(row_key=key)
direct_row.set_cell(
"stats_summary",
b"os_build",
b"android",
datetime.datetime.now())
return [direct_row]
def run(argv=None):
"""Build and run the pipeline."""
options = MyOptions(argv)
with beam.Pipeline(options=options) as p:
p | beam.Create(["phone#4c410523#20190501",
"phone#4c410523#20190502"]) | beam.ParDo(
CreateRowFn()) | WriteToBigTable(
project_id=options.bigtable_project,
instance_id=options.bigtable_instance,
table_id=options.bigtable_table)
if __name__ == '__main__':
run()
I am just starting to explore this now and can link to a more polished version on GitHub once it's complete. Hope this helps you get started.
0
votes
Building on top of what was proposed and adding PubSub, here’s a working version..
Pre requisites
- GCS Bucket created (for Dataflow temp/staging files)
- PubSub topic created
- PubSub subscription created
- BigTable instance created
- BigTable table created
- BigTable column family must be created (no visible error otherwise !)
Example of the latter with cbt
:
cbt -instance test-instance createfamily test-table cf1
Code
Define and run the Dataflow pipeline.
# Packages
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud import pubsub_v1
# Classes
class CreateRowFn(beam.DoFn):
def __init__(self, pipeline_options):
self.instance_id = pipeline_options.bigtable_instance
self.table_id = pipeline_options.bigtable_table
def process(self, key):
from google.cloud.bigtable import row
import datetime
direct_row = row.DirectRow(row_key=key)
direct_row.set_cell(
'cf1',
'field1',
'value1',
timestamp=datetime.datetime.now())
yield direct_row
# Options
class XyzOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--bigtable_project', default='nested'),
parser.add_argument('--bigtable_instance', default='instance'),
parser.add_argument('--bigtable_table', default='table')
pipeline_options = XyzOptions(
save_main_session=True, streaming=True,
runner='DataflowRunner',
project=PROJECT,
region=REGION,
temp_location=TEMP_LOCATION,
staging_location=STAGING_LOCATION,
requirements_file=REQUIREMENTS_FILE,
bigtable_project=PROJECT,
bigtable_instance=INSTANCE,
bigtable_table=TABLE)
# Pipeline
def run (argv=None):
with beam.Pipeline(options=pipeline_options) as p:
input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
_ = (p
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
| 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Conversion string to row object' >> beam.ParDo(CreateRowFn(pipeline_options))
| 'Writing row object to BigTable' >> WriteToBigTable(project_id=pipeline_options.bigtable_project,
instance_id=pipeline_options.bigtable_instance,
table_id=pipeline_options.bigtable_table))
if __name__ == '__main__':
run()
Publish a message b"phone#1111"
to PubSub topic (e.g. using the Python PublisherClient()).
Table content (using happybase)
b'phone#1111': {b'cf1:field1': b'value1'}
Row length: 1