3
votes

I'm trying to load JSON events data from Google Cloud Storage to BigQuery, and I want to load everything as strings and then cast them later, since for some messages they look like this:

{"id":"123"}
{"id":234}

The schema.json I wrote is:

[
    {
        "name": "id",
        "type": "STRING",
        "mode": "NULLABLE"
    }
]

and I load it with the bq cli:

LOC="--location US"
INPUT=sample.json
SCHEMA=schema.json
bq $LOC load \
    --source_format=NEWLINE_DELIMITED_JSON \
    --ignore_unknown_values \
    --schema=$SCHEMA \
    nov2020.test \
    $INPUT

It would fail with this error:

Failure details:

  • Error while reading data, error message: JSON processing encountered too many errors, giving up. Rows: 2; errors: 1; max bad: 0; error percent: 0
  • Error while reading data, error message: JSON parsing error in row starting at position 13: Could not convert value to string. Field: id; Value: 234

I don't want to skip these records with --max_bad_records, and I thought by not using autodetect, I would be able to read everything as strings.

I have about 80GBs of these JSON files every day that I want to process daily, so what can I do to deal with this error? Is my only option to go through each JSON message and format the id field before loading to BigQuery?

I found another post saying to use a plug in for fluentd, but I'm not sure if that applies here since my data was not created using fluentd.

Thanks!

1
The point is: you have json with conflicting types for the same field and BigQuery will always issue an error in those cases. You will need an intermediary process to transform your jsons before loading it to BigQuery.rmesteves
Can you let me know if you have different types for the same field in different files or this situation can happen within the same file?rmesteves
@R.Esteves This situation can happen within the same file.alyafitz
I provided a possible solution for your problem. If it helps you somehow, consider upvoting and accepting :)rmesteves

1 Answers

2
votes

The simplest way to solve your problem is replacing your loading with a Dataflow job. The code below will basically read the files in the bucket, fix the json records and then write the fixed records to BigQuery.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.internal.clients import bigquery
import json

class FixJson(beam.DoFn):
    def __init__(self):
        beam.DoFn.__init__(self)

    def process(self, element, *args, **kwargs):
        row = str(element)
        json_obj = json.loads(row)
        json_obj["field"] = str(json_obj["field"])
        return [json_obj]


table_spec = bigquery.TableReference(
    projectId='<your-project>',
    datasetId='<your-dataset>',
    tableId='<your-table>')

p = beam.Pipeline(options=PipelineOptions())
p1 = p | "Read data from GCS" >> beam.io.ReadFromText('gs://<your-bucket>/*') \
        | "Fix json" >> beam.ParDo(FixJson())\
        | "Write to bq" >> beam.io.WriteToBigQuery(table_spec,
                                            custom_gcs_temp_location = '<some-temporary-bucket>',
                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

p.run()

Lets break this pipeline in some parts and go through it step by step:

  1. In the first line of the pipeline, Dataflow will read as text all the files inside the provided bucket. You should change this and put the correct path to match the data that must be inserted into BigQuery for the given day.

    "Read data from GCS" >> beam.io.ReadFromText('gs:///*')

  2. In the second step, Dataflow will use the function FixJson inside a ParDo transformation to change the structure of your jsons. You should change the logic to fulfill your necessities depending on how complex is your data and how much you have to change it. The function's logic is basically loading the string json records as json objects and convert the values to string for certain keys.

    class FixJson(beam.DoFn):
        def __init__(self):
            beam.DoFn.__init__(self)
        def process(self, element, *args, **kwargs):
            row = str(element)
            json_obj = json.loads(row)
            json_obj["field"] = str(json_obj["field"])
            return [json_obj]
    
    ...
    
    "Fix json" >> beam.ParDo(FixJson())\
    
  3. Finally, in the last step we save the data to BigQuery. To specify the table where the data will be saved, we use a table_spec variable created before as you can see in the code.

    table_spec = bigquery.TableReference(
        projectId='<your-project>',
        datasetId='<your-dataset>',
        tableId='<your-table>')
    
    ...
    
    "Write to bq" >> beam.io.WriteToBigQuery(table_spec,
                                            custom_gcs_temp_location = '<some-temporary-bucket>',                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,                                          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    

To finish, I'd like to make two comments:

  1. This code assumes that the table is already created and with the correct schema.

  2. I tested this code with the sample data below:

    {"field" : 123}
    {"field" : 23}
    {"field" : 3}
    {"field" : "9123"}
    {"field" : "45"}
    {"field" : "12"}
    {"field" : 1}
    {"field" : "13"}