1
votes

I'm trying to convert the Cloud Dataflow "Wordcount" python example to a templated version by modifying the pipeline options to use runtime parameters as instructed in the docs:

def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""

  class WordcountTemplatedOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)
  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)

  # Read the text file[pattern] into a PCollection.
  etc. etc.

The problem is creating and staging the template ... when executing the command, the output is:

INFO:root:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Finished the size estimation of the input at 1 files. Estimation took 0.288088083267 seconds
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Starting finalize_write threads with num_shards: 1, batches: 1, num_threads: 1
INFO:root:Renamed 1 shards in 0.13 seconds.
INFO:root:number of empty lines: 1663
INFO:root:average word length: 4

and there is no produced file under template_location (gs://[YOUR_BUCKET_NAME]/templates/mytemplate) ...

I thought the command was trying to execute the dataflow from the desktop with the "default" input file, so I removed the "default" line in the --input argument, but I got this error:

raise BeamIOError('Unable to get the Filesystem', {path: e})
apache_beam.io.filesystem.BeamIOError: Unable to get the Filesystem with exceptions {None: AttributeError("'NoneType' object has no attribute 'strip'",)}

There is no official python dataflow templated sample (the only snippet I was able to find was this one, which looks pretty much like what's above).

Am I missing something?

Thanks!

1
I'm seeing the exact issue; in Python, when executing the command that is listed in the Google documentation, the pipeline executes right away and there is nothing in my templates folder in GCS. Upvoted to hopefully get some traction. - andre622
Hello @Neurus, I see that you have opened a case with Cloud Support concerning this question. Were you able to successfully generate your template? If so it is recommended to post your solution here to better help the community and to gain further reputation. - Jordan

1 Answers

1
votes

Thanks to Google Cloud Support - I was able to fix the issue. In summary:

  1. Clone the latest wordcount.py example (I have used an older version):

    git clone https://github.com/apache/beam.git

  2. The Google team updated the tutorial, so simply follow the code instructions. Make sure to include the @classmethod _add_argparse_args to be able to receive arguments during runtime, and use the new options when reading from the text file:

    wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)

  3. Generate the template as instructed

You should see the template under the template_location directory now

Thanks!