
I'm trying to write a pipeline to read a stream from pubsub and write it to bigquery using google cloud dataflow with apache beam. I have this code:

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

topic = 'projects/???/topics/???'
table = '???.???'

gcs_path = "gs://???"

with beam.Pipeline(runner="DataflowRunner", argv=[
        "--project", "???",
        "--staging_location", ("%s/staging_location" % gcs_path),
        "--temp_location", ("%s/temp" % gcs_path),
        "--output", ("%s/output" % gcs_path)
    ]) as p:
    | 'winderow' >> beam.WindowInto(FixedWindows(60))
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table))

But I'm getting this error when running it:

No handlers could be found for logger "oauth2client.contrib.multistore_file"
ERROR:root:Error while visiting winderow
Traceback (most recent call last):
  File ".\main.py", line 20, in <module>
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 339, in run
    return self.runner.run(self)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 296, in run
    super(DataflowRunner, self).run(pipeline)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 138, in run
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 367, in visit
    self._root_transform().visit(visitor, self, visited)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 710, in visit
    part.visit(visitor, pipeline, visited)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 713, in visit
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 133, in visit_transform
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 176, in run_transform
    return m(transform_node)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 526, in run_ParDo
    input_step = self._cache.get_pvalue(transform_node.inputs[0])
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 252, in get_pvalue
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 226, in _ensure_pvalue_has_real_producer
    while real_producer.parts:
AttributeError: 'NoneType' object has no attribute 'parts'

Is this a problem with the code or configuration? How can I get it working?

PubSub is an unbounded source so you should add "--streaming" to your argv, and you don't need the p.run().wait_until_finish() part because an unbounded stream will never finishDavos

1 Answers


I don't have experience with windowed pipelines yet but for what I understand from the concept, the windows are supposed to be applied to your input data and not as a pipeline setup.

This being the case, your code probably should be:

with beam.Pipeline(runner="DataflowRunner", argv=[
        "--project", "???",
        "--staging_location", ("%s/staging_location" % gcs_path),
        "--temp_location", ("%s/temp" % gcs_path),
        "--output", ("%s/output" % gcs_path)
    ]) as p:
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'winderow' >> beam.WindowInto(FixedWindows(60))
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table))

The official repo have some samples on windowed operations as well.