0
votes

I'm using API calls to retrieve data from a service. The data is Nested Json with arrays that might also contain Json objects.

Example: enter image description here

Basically I want to upload it to tables within BigQuery. I created a table per array. Jason objects unpacked in the same table. For example:

Orders: All customer fields, all ShippingAdress, orderDateUtc etc..
Orders_items:  orderid, discountEach, giftTo etc..
Order_items_historicalCategories: ....

I'm not sure about the best way to do that. I can create CSV files from the API call (data steam) and then use COPY per CSV to upload them but it seems excessive. I'm looking for a way to skip the CSV creation.

Is there an operator or package that can handle this data and directly upload it to the tables? I assume that what I need to do has already been done by many other organizations yet I didn't see any "build-in" method to do that in the docs https://cloud.google.com/bigquery/docs/loading-data

Any help would be appreciated.

1

1 Answers

1
votes

You basically have to follow documentation on how to load json data using python, using nested and repeated fields. For instance, using the schema from the latter link, you can load nested and repeated JSON data in the following manner (you can test with the sample data found here):

import sys

def load_nested_json():
    from google.cloud import bigquery
    client = bigquery.Client()

    dataset_id, table_id, uri = sys.argv[1:]
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('first_name', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('last_name', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('dob', 'DATE', mode='NULLABLE'),
        bigquery.SchemaField('addresses', 'RECORD', mode='REPEATED', fields=[
            bigquery.SchemaField('status', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('address', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('city', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('state', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('zip', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('numberOfYears', 'STRING', mode='NULLABLE'),
        ]),
    ]
    table_ref = dataset_ref.table(table_id)
    # Uncomment following lines to also create the destination table
    # table = bigquery.Table(table_ref, job_config.schema)
    # table = client.create_table(table)

    # print('Created table {}'.format(table.full_table_id))

    job_config.source_format = "NEWLINE_DELIMITED_JSON"

    load_job = client.load_table_from_uri(
        uri,
        table_ref,
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'

if __name__ == '__main__':
    load_nested_json()