1
votes

I am trying to read some data from Bigquery and some data from file system using below code.

apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()
preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())

But when I run this pipeline, I am getting below error

Traceback (most recent call last): File "/etl/dataflow/etlTXLPreprocessor.py", line 125, in run() File "/etl/dataflow/etlTXLPreprocessor.py", line 120, in run p.run().wait_until_finish() File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run self._options).run(False) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run return self.runner.run_pipeline(self, self._options) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline return runner.run_pipeline(pipeline, options) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline pipeline.replace_all(_get_transform_overrides(options)) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all self._replace(override) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace self.visit(TransformUpdater(self)) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit self._root_transform().visit(visitor, self, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit visitor.visit_transform(self) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform self._replace_if_needed(transform_node) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed new_output = replacement_transform.expand(input_node) File "/etl/dataflow/venv3/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand invoker = DoFnInvoker.create_invoker(signature, process_invocation=False) File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker TypeError: create_invoker() takes at least 2 positional arguments (1 given)

But If I run my code like this

apn = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()
apn1 = p | beam.io.Read(beam.io.BigQuerySource(query=apn_query, use_standard_sql=True)) | beam.combiners.ToList()

or like this

preprocess_rows = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())
preprocess_rows1 = p | beam.io.ReadFromText(file_path, coder=UnicodeCoder())

I am unable to figure out the error. Is It a limitation to read from the same data source in Apache beam pipeline?

2

2 Answers

0
votes

I am getting the same error when performing the same type of action, pulling in data from BigQuery and the filesystem.

lines = p | "Read Input Parameters" >> ReadFromText(options.input)
past_posts = p | "Get Past Posts From BigQuery" >> Read(BigQuerySource(query=f"SELECT url FROM {full_bq_table_id}", use_standard_sql=False))

Error:

Traceback (most recent call last): File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 193, in _run_module_as_main "main", mod_spec) File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/craigslist_pipeline.py", line 14, in full_bq_table_id=f"apartment-data-project:{dataset}.craigslist_posts" File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/pipeline/init.py", line 35, in run result = p.run() File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 461, in run self._options).run(False) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run return self.runner.run_pipeline(self, self._options) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 182, in run_pipeline return runner.run_pipeline(pipeline, options) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 413, in run_pipeline pipeline.replace_all(_get_transform_overrides(options)) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 443, in replace_all self._replace(override) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 340, in _replace self.visit(TransformUpdater(self)) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 503, in visit self._root_transform().visit(visitor, self, visited) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 939, in visit part.visit(visitor, pipeline, visited) [Previous line repeated 1 more time] File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 942, in visit visitor.visit_transform(self) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 338, in visit_transform self._replace_if_needed(transform_node) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/pipeline.py", line 301, in _replace_if_needed new_output = replacement_transform.expand(input_node) File "/Users/ianmitchell/Documents/Personal Projects/Craigslist/env/lib/python3.7/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 87, in expand invoker = DoFnInvoker.create_invoker(signature, process_invocation=False) File "apache_beam/runners/common.py", line 360, in apache_beam.runners.common.DoFnInvoker.create_invoker TypeError: create_invoker() takes at least 2 positional arguments (1 given)

Wondering why you cannot pull from different sources as well.

0
votes

This is a bug in the direct runner in Apache Beam v2.19. The fix was done but not released, yet. Downgrade your apache-beam to 2.16 (pip install apache-beam==2.16) and it will work.