I'm trying to read a collection of XML files from a GCS bucket and process them where each element in the collection is a string representing the whole file but I can't find a decent example on how to accomplish this, nor can I understand it from the Apache Beam documentation which is mainly about the Java version.
My current pipeline looks like this:
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.Read(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
And the error message I'm receiving is:
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1664, in <module>
main()
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1658, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files\JetBrains\PyCharm Community Edition 2018.2.1\helpers\pydev\pydevd.py", line 1068, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 135, in <module>
run()
File "C:/Users/Tomer/PycharmProjects/hyperpartisan/cloud-version/data_ingestion.py", line 130, in run
p.run().wait_until_finish()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 421, in wait_until_finish
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 398, in await_completion
self._executor.await_completion()
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 444, in await_completion
six.reraise(t, v, tb)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 341, in call
finish_state)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\executor.py", line 366, in attempt_call
side_input_values)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 109, in get_evaluator
input_committed_bundle, side_inputs)
File "C:\Users\Tomer\anaconda\envs\hyperpartisan\lib\site-packages\apache_beam\runners\direct\transform_evaluator.py", line 283, in __init__
self._source.pipeline_options = evaluation_context.pipeline_options
AttributeError: 'str' object has no attribute 'pipeline_options'
Any assistance is much appreciated. Thanks Tomer
Solved the first issue: turns out this doesn't work with the DirectRunner, changing the runner to DataFlowRunner and replacing Read with ReadFromText solved the exception:
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
(p
| 'Read from a File' >> beam.io.ReadFromText(training_files_folder)
| 'String To BigQuery Row' >> beam.Map(lambda s:
data_ingestion.parse_method(s))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run().wait_until_finish()
But now I saw that this approach gives me a line from each file as the pipeline element whereas I wanted to have the whole file as a string as each element. Not sure how to do that. I found this post but it's in java and not sure how it works with python at all and gcs version specifically.
So looks like the ReadFromText won't work for my usecase and I have no idea how to create a pipeline of files otherwise.
Solution: Thanks to the assist from Ankur, I revised the code to include the required steps to convert from a list of MatchResult objects which is what the GCSFileSystem Returns to a pCollection of Strings, each representing one file.
p = beam.Pipeline(options=PipelineOptions(pipeline_args))
gcs = GCSFileSystem(PipelineOptions(pipeline_args))
gcs_reader = GCSFileReader(gcs)
(p
| 'Read Files' >> beam.Create([m.metadata_list for m in gcs.match([training_files_folder])])
| 'metadata_list to filepath' >> beam.FlatMap(lambda metadata_list: [metadata.path for metadata in metadata_list])
| 'string To BigQuery Row' >> beam.Map(lambda filepath:
data_ingestion.parse_method(gcs_reader.get_string_from_filepath(filepath)))
| 'Write to BigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='title:STRING,text:STRING,id:STRING',
# Creates the table in BigQuery if it does not yet exist.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
# Appends data to the BigQuery table
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
p.run().wait_until_finish()
The code uses this helper class to read the gcs files:
class GCSFileReader:
"""Helper class to read gcs files"""
def __init__(self, gcs):
self.gcs = gcs
def get_string_from_filepath(self,filepath):
with self.gcs.open(filepath) as reader:
res = reader.read()
return res