0
votes

I need to load a lot of json files into BQ using Apache Beam in Python. The jsons has a pretty complex schema (with multiple levels of hierarchy), and more importantly - it is not consistent. Some fields are so rare that they appear in only 0.01% percent of the jsons. I can't let BQ infer the schema in the WriteToBigQuery method using AUTO_DETECT because it only examines 100 rows - not nearly enough. I tried building a schema against 0.1% percent of the data using the python generate-schema utility - but again, some fields are so rare that it still fails.

No such field: FIELD_NAME.

I tried finding a way to upload the file regardless of any errors, and saving the errors to an error table, that I can handle separately. However, I didn't find anyway to do so in the WriteToBigQuery module. I also tried validating each json before sending it to the pipeline, but it was extremely slow. I also tried "filtering" the json according to a specified schema, but again that requires going over all of the json - really slow as well, as each json size is about 13 KB.

Did anyone come across anything that can help? It is weird that there isn't any max_rejected attribute to use when writing to BQ using Apache Beam. Any idea on how to handle this will be appreciated.

2

2 Answers

1
votes

A possibility is to calculate the schema 'manually'. If we represent schemas as a set of tuples set([field, type]) - for example set([('name', str), ('age', int)]).

class CombineSchemasByDestination(beam.DoFn):
  def __init__(self):
    self.schemas_per_dest = defaultdict(set)

  def process(self, dest_schema):
    destination, schemas = dest_schema
    for s in schemas:
      self.schemas_per_dest[destination].union(s)

  def finish_bundle(self):
    for dest, schema in self.schemas_per_dest.items():
      yield (dest, schema)

schemas_per_dest = (my_data 
                    | beam.Map(lambda row: (get_destination(row), 
                                            [get_row_schema(row)]))
                    | beam.ParDo(CombineSchemasByDestination())
                    | beam.GroupByKey()
                    | beam.CombineSchemasByDestination())

my_data | beam.WriteToBigQuery(....
  schema=lambda dest, schema_map: schema_map.get(dest),
  schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))

I'd think this should help solve your problem. Thoughts?

0
votes

What I ended up doing was to format the JSON according to the mistakes I kept getting from BQ. I noticed that the missing fields were always nested entirely under 2-3 fields in the JSON, so I just converted those fields as is to JSON - and this way I loaded the data successfully. Still, an error log table with a max rejected setting in Apache Beam would have been extremely helpful.