Here's the situation: I have a set of files in GCS that are compressed and have a .gz file extension (i.e. 000000_[0-5].gz) that I am trying to import into a single BQ table. I have been executing commands from the command line to date, but wanted to accomplish this with Dataflow, potentially adding in some transformations in the future.
The data in the compressed GCS files is a complex JSON structure that frequently changes schema, so it is easiest to bring the entire file into BigQuery as a TSV with only one column, called record
, and then use JSON_EXTRACT functions within BQ to parse out the values needed at the time they are needed.
Issue: I have written a Dataflow pipeline that will do the bare minimum in this scenario; read from GCS and write to a BigQuery table. When I execute this pipeline, however, I am getting a JSON parse error, shown here:
Error while reading data, error message: JSON table encountered too
many errors, giving up. Rows: 1; errors: 1., error: Error while reading
data, error message: JSON table encountered too many errors, giving up.
Rows: 1; errors: 1., error: Error while reading data, error message:
JSON parsing error in row starting at position 2630029539: Value
encountered without start of object.
Below is my Dataflow script with some variables anonymized.
from __future__ import absolute_import
import argparse
import logging
import re
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET_NAME/input-data/000000_0.gz',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=GCP_PROJECT_NAME',
'--staging_location=gs://BUCKET_NAME/dataflow-staging',
'--temp_location=gs://BUCKET_NAME/dataflow-temp',
'--job_name=gcs-gzcomp-to-bq1',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
(p | "ReadFromGCS" >> ReadFromText(known_args.input)
| WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
project='GCP_PROJECT_NAME', schema='record:string'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
As you can see, I attempted to do the same thing as I am doing in the traditional load job, by specifying a schema containing only one column with a string type, but it is still failing.
Is there a way to explicitly tell Dataflow more details about how I want to import the GCS files? i.e. specifying TSV even though it is a valid JSON object on each line?
Also, if this error is related to anything else I may have screwed up, please call that out as well; I'm super new to Dataflow, but pretty experienced with BQ & some other GCP tools, so hoping to add this to my toolbelt.