I'm trying to write to bigquery from the Apache Beam PTransform WriteToBigQuery() and when I provide the table with a lambda function that reads the value of the field "DEVICE" I get an error. I did this exact thing in an streaming job and worked but for some reason this isn't working in this batch job.
My pipeline options:
import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
options = pipeline_options.PipelineOptions(flags=[])
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
options.view_as(GoogleCloudOptions).region = 'europe-west1'
options.view_as(pipeline_options.SetupOptions).sdk_location = (
'/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
beam.version.__version__)
My code:
p = beam.Pipeline(DataflowRunner(), options=options)
data = (p
| "Read text" >> beam.io.textio.ReadFromText(f'gs://{bucket_name}/{file}')
| "Parse json" >> beam.ParDo(lambda element: json.loads(element))
)
telemetry_data = (data
| "Filter telemetry data" >> beam.Filter(lambda element: element['type_MQTT'] == 'telemetry_data')
| "Format telemetry data" >> beam.Map(format_telemetry)
| "Telemetry data to bq" >> beam.io.WriteToBigQuery(
table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
My entire Error message:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-12-ae8dd133c81b> in <module>
13 table = lambda element: f'project:dataset.{element["DEVICE"]}__opdata',
---> 14 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
15 )
16 )
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pvalue.py in __or__(self, ptransform)
138
139 def __or__(self, ptransform):
--> 140 return self.pipeline.apply(ptransform, self)
141
142
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
596 if isinstance(transform, ptransform._NamedPTransform):
597 return self.apply(
--> 598 transform.transform, pvalueish, label or transform.label)
599
600 if not isinstance(transform, ptransform.PTransform):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
606 try:
607 old_label, transform.label = transform.label, label
--> 608 return self.apply(transform, pvalueish)
609 finally:
610 transform.label = old_label
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/pipeline.py in apply(self, transform, pvalueish, label)
649 transform.type_check_inputs(pvalueish)
650
--> 651 pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
652
653 if type_options is not None and type_options.pipeline_type_check:
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply(self, transform, input, options)
151 def apply(self, transform, input, options):
152 self._maybe_add_unified_worker_missing_options(options)
--> 153 return super(DataflowRunner, self).apply(transform, input, options)
154
155 def _get_unique_step_name(self):
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/runner.py in apply(self, transform, input, options)
196 m = getattr(self, 'apply_%s' % cls.__name__, None)
197 if m:
--> 198 return m(transform, input, options)
199 raise NotImplementedError(
200 'Execution of [%s] not implemented in runner %s.' % (transform, self))
~/apache-beam-custom/packages/beam/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py in apply_WriteToBigQuery(self, transform, pcoll, options)
833 return pcoll | 'WriteToBigQuery' >> beam.io.Write(
834 beam.io.BigQuerySink(
--> 835 transform.table_reference.tableId,
836 transform.table_reference.datasetId,
837 transform.table_reference.projectId,
AttributeError: 'function' object has no attribute 'tableId'