4
votes

I would like to call a beam.io.Write(beam.io.BigQuerySink(..)) operation from within a ParDo function to generate a separate BigQuery table for each key in the PCollection (i'm using the python SDK). Here are two similar threads, which unfortunately didn't help:

1) https://stackguides.com/questions/31156774/about-key-grouping-with-groupbykey

2) Dynamic table name when writing to BQ from dataflow pipelines

When I execute the following code, the rows for the first key get inserted into BigQuery and then the pipeline fails with the error below. Would really appreciate any suggestions on what I'm doing wrong or any suggestions on how to fix it.

Pipeline code:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query))

class par_upload(beam.DoFn):

    def process(self, context):
        key, value = context.element

        ### This block causes issues ###
        value | 'write_to_bq' >> beam.io.Write(
                        beam.io.BigQuerySink(
                            'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key
                            schema=schema,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                            )
            )
        ### End block ######
        return [value] 


### Following part works fine ###
filtered = (rows    | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
                    | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
                    | 'group_by_key' >> beam.GroupByKey() 
                    | 'par_upload_to_bigquery' >> beam.ParDo(par_upload())
                    | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing
                )

### This part works fine if I comment out the 'write_to_bq' block above
filtered | 'write_to_bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            'PROJECT-NAME:analytics.another_table',
            schema=schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

Error message:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Attempting refresh to obtain initial access_token
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table.
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})}
ERROR:root:Error while visiting par_upload_to_bigquery
Traceback (most recent call last):
  File "split_events.py", line 137, in <module>
    run()
  File "split_events.py", line 132, in run
    p.run()
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
    return self.runner.run(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run
    super(DirectPipelineRunner, self).run(pipeline)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
    pipeline.visit(RunVisitor(self))
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
    self._root_transform().visit(visitor, self, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
    part.visit(visitor, pipeline, visited)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
    visitor.visit_transform(self)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
    self.runner.run_transform(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
    return m(transform_node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper
    func(self, pvalue, *args, **kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo
    runner.process(v)
  File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483)
  File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311)
  File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677)
  File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process
    return self.run(self.dofn.process, context, args, kwargs)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run
    result = method(context, *args, **kwargs)
  File "split_events.py", line 73, in process
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__
    return _MaterializePValues(cache).visit(result)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit
    return self._pvalue_cache.get_unwindowed_pvalue(node)
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue
    return [v.value for v in self.get_pvalue(pvalue)]
  File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue
    value_with_refcount = self._cache[self.key(pvalue)]
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']"

Edit (after the first answer):

I didn't realise my value needs to be a PCollection.

I've changed my code to this now (which is probably very inefficient):

key_pipe = p | 'pipe_' + key >> beam.Create(value)
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..))

Which now works fine locally but not with BlockingDataflowPipelineRunner :-(

The pipeline fails with the following error:

    JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work
    work_executor.execute()
  File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331)
    op.start()
  File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193)
    def start(self):
  File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499)
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5)
1
I think the first answer is still correct -- you can't add more steps to a pipeline from within a DoFn that is running as part of that pipeline. The fact this works in the DirectRunner is a bug. As the other threads suggest if you want to do this kind of data-dependent write you'll need to interact directly with the BigQuery API rather than using a BigQuerySink for now.Ben Chambers

1 Answers

0
votes

In the similar threads, the only suggestion to do BigQuery write operations in a ParDo was to use the BigQuery API directly, or using a client.

The code that you wrote is putting a Dataflow ParDo class beam.io.BigQuerySink() into a DoFn function. The ParDo class expects to work on a PCollection like filtered in the working code example. Which is not the case for the non-functioning code working on value.

I think the easiest option would be to take a look at the gcloud-python BigQuery function insert_data() and put this inside your ParDo.