1
votes

I try to rename bigquery rows in an Apache Beam Pipeline in Python like in the following example : Having 1 PCollection with the full data and 1 other with only 3 fields renamed col1 in col1.2, col2 in col2.2...

How can I apply my filter correctly to get that second PCollection with renamed rows ?

def is_filtered(row):
    row['col1'] == row['col1.2']
    row['col2'] == row['col2.2']
    row['col3'] == row['col3.2']
    yield row


with beam.Pipeline() as pipeline:
    query = open('query.sql', 'r')
    bq_source = beam.io.BigQuerySource(query=query.read(),
                                       use_standard_sql=True)    
    main_table = \
        pipeline \
        | 'ReadBQData' >> beam.io.Read(bq_source) \

    cycle_table = (
        pipeline 
        | 'FilterMainTable' >> beam.Filter(is_filtered, main_table))

I also thaught about using Partition but the Partition examples I found were more about partitioning the content of the rows and not the row itself

1

1 Answers

1
votes

The Filter operator is used to create a PCollection with rows removed from the source (it is expected to return a boolean). Use the Map operator if you want to create a PCollection with rows transformed 1:1. Here is an example:

def filter_columns(row):
    return {'col1.2': row['col1'],
            'col2.2': row['col2'],
            'col3.2': row['col3']}


with beam.Pipeline() as pipeline:
    query = open('query.sql', 'r')
    bq_source = beam.io.BigQuerySource(query=query.read(),
                                       use_standard_sql=True)    
    main_table = \
        pipeline \
        | 'ReadBQData' >> beam.io.Read(bq_source)

    cycle_table = (
        main_table 
        | 'FilterMainTable' >> beam.Map(filter_columns))