I am facing with a problem in Dataflow. I used Python BigQuery API, and it works fine with autodetect. It run fine, job_config create table and at the same time append values:
...
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.create_disposition = 'CREATE_IF_NEEDED',
job_config.source_format = 'CSV',
job_config.field_delimiter = '|',
job_config.skip_leading_rows = 1
load_job = client.load_table_from_uri(
uri,
table_id,
location=region,
job_config=job_config,)
So, I need to use Dataflow, to scale transfer from gcs to BigQuery, but, I don't find resources on how to use autodetect in writetobigquery apache beam.
class DataIngestion:
def parse_method(self, string_input):
values = re.split(",",
re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('date', 'volume', 'open', 'close', 'high', 'low', 'adjclose'),
values))
return row
...
(p
| 'Read from a File' >> beam.io.ReadFromText(known_args.input,
skip_header_lines=1)
| 'String To BigQuery Row' >>
beam.Map(lambda s: data_ingestion.parse_method(s))
#| 'Parse file' >> beam.Map(parse_file)
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output,
custom_gcs_temp_location="gs://XXXX",
create_disposition = "CREATE_IF_NEEDED",
write_disposition = "WRITE_APPEND"))
p.run().wait_until_finish()
So, I have parse_method too, but I tried to run return only values without dict...
WriteToBigQueryshould be in the for of a dict{"column_name": value...}. Maybe you can find more info in the sink docs beam.apache.org/releases/pydoc/2.28.0/… - Tudor Plugaru