0
votes

I am facing with a problem in Dataflow. I used Python BigQuery API, and it works fine with autodetect. It run fine, job_config create table and at the same time append values:

...
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.create_disposition = 'CREATE_IF_NEEDED',
job_config.source_format = 'CSV',
job_config.field_delimiter = '|',
job_config.skip_leading_rows = 1

load_job = client.load_table_from_uri(
            uri,
            table_id,
            location=region,
            job_config=job_config,)

So, I need to use Dataflow, to scale transfer from gcs to BigQuery, but, I don't find resources on how to use autodetect in writetobigquery apache beam.

 class DataIngestion:
    def parse_method(self, string_input):
        values = re.split(",",
                          re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('date', 'volume', 'open', 'close', 'high', 'low', 'adjclose'),
                values))
        return row
...

    (p
     | 'Read from a File' >> beam.io.ReadFromText(known_args.input, 
                                                  skip_header_lines=1)
     | 'String To BigQuery Row' >>
     beam.Map(lambda s: data_ingestion.parse_method(s))
     #| 'Parse file' >> beam.Map(parse_file)
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             known_args.output,
             custom_gcs_temp_location="gs://XXXX",
             create_disposition = "CREATE_IF_NEEDED",
             write_disposition = "WRITE_APPEND"))
    p.run().wait_until_finish()

So, I have parse_method too, but I tried to run return only values without dict...

1
Values passed to WriteToBigQuery should be in the for of a dict {"column_name": value...}. Maybe you can find more info in the sink docs beam.apache.org/releases/pydoc/2.28.0/… - Tudor Plugaru

1 Answers

1
votes

Try passing schema='SCHEMA_AUTODETECT' to the PTransform. That should enable it.