Use case
I want to parse multiple files from Cloud storage and insert the results into a BigQuery table.
Selecting one particular file to read works fine. However I'm struggling when switching out the one file to instead include all files by using the *
glob pattern.
I'm executing the job like this:
python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test
This is the first Dataflow experiment and I'm not sure how to debug it or what best practices there are for a pipeline like this.
Expected outcome
I would expect that the job gets uploaded to Dataflow runner and that gathering the list of files and iterating each would happen in the cloud at run time. I would expect to be able to pass the contents of all files in the same way as I do when reading one file.
Actual outcome
The job blocks already at the point of trying to submit it to the Cloud Dataflow runner.
Contents of batch.py
"""A metric sink workflow."""
from __future__ import absolute_import
import json
import argparse
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
class ExtractDatapointsFn(beam.DoFn):
"""
Parse json documents and extract the metrics datapoints.
"""
def __init__(self):
super(ExtractDatapointsFn, self).__init__()
self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')
def process(self, element):
"""
Process json that contains metrics of each element.
Args:
element: the element being processed.
Returns:
unmarshaled json for each metric point.
"""
try:
# Catch parsing errors as well as our custom key check.
document = json.loads(element)
if not "DataPoints" in document:
raise ValueError("missing DataPoints")
except ValueError:
self.total_invalid.inc(1)
return
for point in document["DataPoints"]:
yield point
def run(argv=None):
"""
Main entry point; defines and runs the pipeline.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://foobar-sink/*',
help='Input file to process.')
parser.add_argument('--output',
required=True,
help=(
'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(GoogleCloudOptions)
pipe = beam.Pipeline(options=pipeline_options)
# Read the json data and extract the datapoints.
documents = pipe | 'read' >> ReadFromText(known_args.input)
metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())
# BigQuery sink table.
_ = metrics | 'write bq' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
# Actually run the pipeline (all operations above are deferred).
result = pipe.run()
result.wait_until_finish()
total_invalid_filter = MetricsFilter().with_name('total_invalid')
query_result = result.metrics().query(total_invalid_filter)
if query_result['counters']:
total_invalid_counter = query_result['counters'][0]
logging.info('number of invalid documents: %d', total_invalid_counter.committed)
else:
logging.info('no invalid documents were found')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()