0
votes

I'm trying to write to bigquery from the Apache Beam PTransform WriteToBigQuery() and when I provide the table with a lambda function that reads the value of the field "DEVICE" I get an error. I did this exact thing in an streaming job and worked but for some reason this isn't working in this batch job.

My pipeline options:

import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % 
    beam.version.__version__)

My code:

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )

My entire Error message:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
     13             table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
---> 14             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
     15                                                             )
     16                      )

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
    138 
    139   def __or__(self, ptransform):
--> 140     return self.pipeline.apply(ptransform, self)
    141 
    142 

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    596     if isinstance(transform, ptransform._NamedPTransform):
    597       return self.apply(
--> 598           transform.transform, pvalueish, label or transform.label)
    599 
    600     if not isinstance(transform, ptransform.PTransform):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    606       try:
    607         old_label, transform.label = transform.label, label
--> 608         return self.apply(transform, pvalueish)
    609       finally:
    610         transform.label = old_label

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
    649         transform.type_check_inputs(pvalueish)
    650 
--> 651       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
    652 
    653       if type_options is not None and type_options.pipeline_type_check:

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
    151   def apply(self, transform, input, options):
    152     self._maybe_add_unified_worker_missing_options(options)
--> 153     return super(DataflowRunner, self).apply(transform, input, options)
    154 
    155   def _get_unique_step_name(self):

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
    196       m = getattr(self, 'apply_%s' % cls.__name__, None)
    197       if m:
--> 198         return m(transform, input, options)
    199     raise NotImplementedError(
    200         'Execution of [%s] not implemented in runner %s.' % (transform, self))

~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
    833       return pcoll | 'WriteToBigQuery' >> beam.io.Write(
    834           beam.io.BigQuerySink(
--> 835               transform.table_reference.tableId,
    836               transform.table_reference.datasetId,
    837               transform.table_reference.projectId,

AttributeError: 'function' object has no attribute 'tableId'
3

3 Answers

0
votes

According to documentation and to this thread https://stackoverflow.com/a/62146803/5283663 it looks like you need to specify the schema parameter.

Does this overcome the issue?

p = beam.Pipeline(DataflowRunner(), options=options)

data = (p
    | "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
    | "Parse json" >> beam.ParDo(lambda element: json.loads(element))
       )

telemetry_data = (data
    | "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
    | "Format telemetry data" >> beam.Map(format_telemetry)
    | "Telemetry data to bq" >> beam.io.WriteToBigQuery(
        table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
        schema=set_schema,            
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                        )
                 )
0
votes

It seems it's an issue with the DataflowRunner itself. I made a quick example and I get the same error. I tried SDKs from 2.11.0 till 2.21, and I remember I made a code sample of this around a year ago with 2.13.0, so I think what changed is the DataflowRunner itself.

If you use DirectRunner, it works fine. Sample code:

    with beam.Pipeline(options=pipeline_options) as p:
        elements = [
            {'number': 1, 'table': "table1"},
            {'number': 2, 'table': "table2"},
            {'number': 3, 'table': "table1"},
        ]

        schema='number:integer'

        def get_table(element):
            table = element['table']
            element.pop('table')
            return f"{known_args.project}:{known_args.dataset}.{table}"

        dyn_bq = (
                p
                | beam.Create(elements)
                | WriteToBigQuery(table=get_table,
                                   schema=schema,
                                   create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                   write_disposition=BigQueryDisposition.WRITE_APPEND)
        )

Don't see any workaround possible. I'll file a public issue and update this.

0
votes

If you look at how the table parameter is parsed, when given a callable, it's directly parsed as that callable. So later, the code is trying to access some attribute of that callable which is not valid.

Could you please try providing a tuple/TableRefernce instead of a lambda?

For example,

table = bigquery_tools.parse_table_reference(f'project:dataset.{element["DEVICE"]}__opdata')