I have a beam pipeline running using a dataflow runner. It takes in XML and outputs a JSON which has to be then stored in Bigquery table. Earlier I was writing the newline delimited JSON into GCS bucket using beam pipeline and creating a BQ table from the file without making any changes into it (using the bigquery console). The job runs successfully and the data gets imported into BQ without any hiccups.
Now I have modified the pipeline in order to directly write the output JSON rows into BQ table. I am using the apache beams beam.io.WriteToBigQuery function. Pcollections are json objects where each line contains one single object (row) for the BQ.
below is the sample input that goes into the WriteToBigQuery :
{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}
Part of my code that is as below:
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
google_cloud_options.job_name = 'jobid'
google_cloud_options.staging_location = 'gs://bucket/staging'
google_cloud_options.temp_location = 'gs://bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
table_spec = 'project:dataset.table'
data = (p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
#| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
#| beam.Map(print_row)
)
project_id = "project1"
dataset_id = 'dataset'
table_id = 'table'
table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')
data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
p.run()
The error when I run this pipeline is as below:
AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']
I think the error is due to the return type from previous step, or something related to execution of straming and batch loading into BigQuery. I want to do a batch load in mycase. I have tried to work with the example insert pipeline given at Apache BEam documentation-Writing to a bigquery table That pipeline works. The form of data there is as below:
quotes = p | beam.Create([
{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])
How can I modify my pipeline so that the string type data in my case gets written to bigquery table.