3
votes

Use case

I want to parse multiple files from Cloud storage and insert the results into a BigQuery table.

Selecting one particular file to read works fine. However I'm struggling when switching out the one file to instead include all files by using the * glob pattern.

I'm executing the job like this:

python batch.py --project foobar --job_name foobar-metrics --runner DataflowRunner --staging_location gs://foobar-staging/dataflow --temp_location gs://foobar-staging/dataflow_temp --output foobar.test

This is the first Dataflow experiment and I'm not sure how to debug it or what best practices there are for a pipeline like this.

Expected outcome

I would expect that the job gets uploaded to Dataflow runner and that gathering the list of files and iterating each would happen in the cloud at run time. I would expect to be able to pass the contents of all files in the same way as I do when reading one file.

Actual outcome

The job blocks already at the point of trying to submit it to the Cloud Dataflow runner.

Contents of batch.py

"""A metric sink workflow."""

from __future__ import absolute_import

import json
import argparse
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions

class ExtractDatapointsFn(beam.DoFn):
    """
    Parse json documents and extract the metrics datapoints.
    """
    def __init__(self):
        super(ExtractDatapointsFn, self).__init__()
        self.total_invalid = Metrics.counter(self.__class__, 'total_invalid')

    def process(self, element):
        """
        Process json that contains metrics of each element.

        Args:
            element: the element being processed.

        Returns:
            unmarshaled json for each metric point.
        """
        try:
            # Catch parsing errors as well as our custom key check.
            document = json.loads(element)
            if not "DataPoints" in document:
                raise ValueError("missing DataPoints")
        except ValueError:
            self.total_invalid.inc(1)
            return

        for point in document["DataPoints"]:
            yield point

def run(argv=None):
    """
    Main entry point; defines and runs the pipeline.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://foobar-sink/*',
                        help='Input file to process.')
    parser.add_argument('--output',
                        required=True,
                        help=(
                            'Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
                            'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)
    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(GoogleCloudOptions)
    pipe = beam.Pipeline(options=pipeline_options)

    # Read the json data and extract the datapoints.
    documents = pipe | 'read' >> ReadFromText(known_args.input)
    metrics = documents | 'extract datapoints' >> beam.ParDo(ExtractDatapointsFn())

    # BigQuery sink table.
    _ = metrics | 'write bq' >> beam.io.Write(
        beam.io.BigQuerySink(
            known_args.output,
            schema='Path:STRING, Value:FLOAT, Timestamp:TIMESTAMP',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))

    # Actually run the pipeline (all operations above are deferred).
    result = pipe.run()
    result.wait_until_finish()

    total_invalid_filter = MetricsFilter().with_name('total_invalid')
    query_result = result.metrics().query(total_invalid_filter)
    if query_result['counters']:
        total_invalid_counter = query_result['counters'][0]
        logging.info('number of invalid documents: %d', total_invalid_counter.committed)
    else:
        logging.info('no invalid documents were found')

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()
1
Could you show what error message you see when trying to submit the job?Pablo
There are no output whatsoever. The submit blocks and I can see local network activity, my theory is that it expands the glob locally but I don't know for sure. I cancelled it manually after 3 minutes.baloo
so the job is never created?Pablo
@Pablo yeah, it never gets to submitting it. Nothing will show in Google Dataflow. While selecting one particular file actually submits it properly.baloo
Looks like Dataflow job submission is running into issues (or taking a long time) when trying to estimate size of the glob. Do you know how many files the glob will expand into ?chamikara

1 Answers

1
votes

We do size estimation of sources at job submission so that Dataflow service can use that information when initializing the job (for example, to determine initial number of workers). To estimate size of a glob we need to expand the glob. This could take some time (I believe several minutes for GCS) if the glob expands into more than 100k files. We'll look into ways in which we can improve user experience here.