5
votes

I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.

Using beam.io.read(beam.io.BigQuerySource()) I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.

Is there a custom function to achieve the same , could you please help me?

import logging

import apache_beam as beam


PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
]

with beam.Pipeline(argv=argv) as p:

    # Execute the SQL in big query and store the result data set into given Destination big query table.
    BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
        beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
    # Extract data from Bigquery to GCS in CSV format.
    # This is where I need your help

    BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
            table='tablename',
            dataset='datasetname',
            project='project_id',
            schema='name:string,gender:string,count:integer',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()
2
Welcome to Stack Overflow! Please take the tour and visit the help center to get the most out of this site. Please also share the relevant parts of the code you have developed so far. This helps figuring out what the problem could be.Adrian W

2 Answers

12
votes

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA now contains key-value pairs:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

Excerpt of BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

Written results:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
1
votes

For anyone looking for an update using Python 3, replace the line of

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

with

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))