I can't create a custom Google Cloud Dataflow template using the wordcount example following the instructions here: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
I get an error relating to the RuntimeValueProvider being unaccessible. What am I doing wrong?
My main function wordcount.py
:
"""A word-counting workflow."""
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
self.words_counter = Metrics.counter(self.__class__, 'words')
self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths')
self.word_lengths_dist = Metrics.distribution(
self.__class__, 'word_len_dist')
self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines')
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
text_line = element.strip()
if not text_line:
self.empty_line_counter.inc(1)
words = re.findall(r'[\w\']+', text_line, re.UNICODE)
for w in words:
self.words_counter.inc()
self.word_lengths_counter.inc(len(w))
self.word_lengths_dist.update(len(w))
return words
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
class WordcountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input',
default='gs://wordcount_custom_template/input/example.txt',
help='Path of the file to read from')
parser.add_value_provider_argument(
'--output',
required=True,
default='gs//wordcount_custom_template/output/count',
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'some/output_path'])
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountOptions)
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(wordcount_options.input)
# Count the occurrences of each word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(count_ones))
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %d' % (word, count)
output = counts | 'format' >> beam.Map(format_result)
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | 'write' >> WriteToText(wordcount_options.output)
result = p.run()
result.wait_until_finish()
# Do not query metrics when creating a template which doesn't run
if (not hasattr(result, 'has_job') # direct runner
or result.has_job): # not just a template creation
empty_lines_filter = MetricsFilter().with_name('empty_lines')
query_result = result.metrics().query(empty_lines_filter)
if query_result['counters']:
empty_lines_counter = query_result['counters'][0]
logging.info('number of empty lines: %d', empty_lines_counter.result)
word_lengths_filter = MetricsFilter().with_name('word_len_dist')
query_result = result.metrics().query(word_lengths_filter)
if query_result['distributions']:
word_lengths_dist = query_result['distributions'][0]
logging.info('average word length: %d', word_lengths_dist.result.mean)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
My template creation code:
#!/usr/bin/env bash
python wordcount.py \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://wordcount_custom_template/staging \
--temp_location gs://wordcount_custom_template/temp \
--template_location gs://wordcount_custom_template/template/wordcount_template
The error I receive:
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible
I don't really understand what this error message means as gs://wordcount_custom_template/input/example.txt
is accessible
Full stacktrace:
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at 0x108e5fa28> ====================
INFO:root:==================== <function lift_combiners at 0x108e5ff50> ====================
INFO:root:==================== <function expand_gbk at 0x108e5fde8> ====================
INFO:root:==================== <function sink_flattens at 0x108e5fe60> ====================
INFO:root:==================== <function greedily_fuse at 0x108e5f848> ====================
INFO:root:==================== <function sort_stages at 0x108e5faa0> ====================
INFO:root:Running (ref_AppliedPTransform_read/Read_3)+((ref_AppliedPTransform_split_4)+((ref_AppliedPTransform_pair_with_one_5)+(group/Write)))
INFO:root:start <DataOutputOperation group/Write >
INFO:root:start <DoOperation pair_with_one output_tags=['out']>
INFO:root:start <DoOperation split output_tags=['out']>
INFO:root:start <ReadOperation read/Read source=SourceBundle(weight=1.0, source=<apache_beam.io.textio._TextSource object at 0x108cfcd50>, start_position=None, stop_position=None)>
Traceback (most recent call last):
File "wordcount.py", line 121, in <module>
run()
File "wordcount.py", line 100, in run
result = p.run()
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 369, in run
self.to_runner_api(), self.runner, self._options).run(False)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/pipeline.py", line 382, in run
return self.runner.run_pipeline(self)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
return runner.run_pipeline(pipeline)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 215, in run_pipeline
return self.run_via_runner_api(pipeline.to_runner_api())
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 218, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 837, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 938, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1110, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1003, in push
response = self.worker.do_instruction(request)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 185, in do_instruction
request.instruction_id)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 202, in process_bundle
processor.process_bundle(instruction_id)
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 286, in process_bundle
op.start()
File "apache_beam/runners/worker/operations.py", line 227, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 228, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 229, in apache_beam.runners.worker.operations.ReadOperation.start
File "apache_beam/runners/worker/operations.py", line 231, in apache_beam.runners.worker.operations.ReadOperation.start
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 197, in get_range_tracker
return self._get_concat_source().get_range_tracker(start_position,
File "/Users/chris/.pyenv/versions/cl2/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 123, in _f
raise error.RuntimeValueProviderError('%s not accessible' % obj)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input, type: str, default_value: 'gs://wordcount_custom_template/input/example.txt') not accessible
Another thing I don't understand is how can it be that I specify the DataflowRunner yet the DirectRunner is called as shown in the stacktrace?