I would like to call a beam.io.Write(beam.io.BigQuerySink(..))
operation from within a ParDo function to generate a separate BigQuery table for each key in the PCollection
(i'm using the python SDK). Here are two similar threads, which unfortunately didn't help:
1) https://stackguides.com/questions/31156774/about-key-grouping-with-groupbykey
2) Dynamic table name when writing to BQ from dataflow pipelines
When I execute the following code, the rows for the first key get inserted into BigQuery and then the pipeline fails with the error below. Would really appreciate any suggestions on what I'm doing wrong or any suggestions on how to fix it.
Pipeline code:
rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))
class par_upload(beam.DoFn):
def process(self, context):
key, value = context.element
### This block causes issues ###
value | 'write_to_bq' >> beam.io.Write(
beam.io.BigQuerySink(
'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
### End block ######
return [value]
### Following part works fine ###
filtered = (rows | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics')
| 'apply_projection' >> beam.Map(apply_projection, projection_fields)
| 'group_by_key' >> beam.GroupByKey()
| 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
| 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
)
### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
beam.io.BigQuerySink(
'PROJECT-NAME:analytics.another_table',
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
Error message:
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
File "split_events.py", line 137, in <module>
run()
File "split_events.py", line 132, in run
p.run()
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
return self.runner.run(self)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
super(DirectPipelineRunner, self).run(pipeline)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
pipeline.visit(RunVisitor(self))
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
self._root_transform().visit(visitor, self, visited)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
part.visit(visitor, pipeline, visited)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
visitor.visit_transform(self)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
self.runner.run_transform(transform_node)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
return m(transform_node)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
func(self, pvalue, *args, **kwargs)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
runner.process(v)
File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
return self.run(self.dofn.process, context, args, kwargs)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
result = method(context, *args, **kwargs)
File "split_events.py", line 73, in process
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
return _MaterializePValues(cache).visit(result)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
return self._pvalue_cache.get_unwindowed_pvalue(node)
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
return [v.value for v in self.get_pvalue(pvalue)]
File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"
Edit (after the first answer):
I didn't realise my value needs to be a PCollection
.
I've changed my code to this now (which is probably very inefficient):
key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))
Which now works fine locally but not with BlockingDataflowPipelineRunner
:-(
The pipeline fails with the following error:
JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
op.start()
File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
def start(self):
File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)
DoFn
that is running as part of that pipeline. The fact this works in the DirectRunner is a bug. As the other threads suggest if you want to do this kind of data-dependent write you'll need to interact directly with the BigQuery API rather than using a BigQuerySink for now. – Ben Chambers