2
votes

I have a pipeline using the Python SDK 2.2.0 for Apache Beam.

This pipeline is almost a typical word count: I have pairs of names in the format ("John Doe, Jane Smith", 1), and I'm trying to figure out how many times each pair of names appears together, like this:

p_collection
            | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
            | "GroupByKey" >> beam.GroupByKey()
            | "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
            | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

When I run this code locally, with a small dataset, it works perfectly.

But when I deploy it to Google Cloud DataFlow, I get the following error:

An exception was raised when trying to execute the workitem 423109085466017585 : Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start def start(self): File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.scoped_start_state: File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.shuffle_source.reader() as reader: File "dataflow_worker/shuffle_operations.py", line 69, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start self.output(windowed_value) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "dataflow_worker/shuffle_operations.py", line 233, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process self.output(wvalue.with_value((k, wvalue.value))) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise new_exn, None, original_traceback File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._value_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 518, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables values[i], nested=nested or i + 1 < len(self._coder_impls))) RuntimeError: KeyError: 0 [while running 'Transform/Format']

Looking at the source code of where this error pops up from, I thought it could be cause due to the fact that some of the names contain some weird encoded characters, so in a desperate act I tried using the .encode("ascii", errors="ignore").decode() you see on the code, but no luck.

Any ideas as to why this pipeline executes successfully locally, but fails on DataFlow runner?

Thanks!

3
I'm in the same position as you. Pipeline runs perfectly locally, but fails on Dataflow with the exact same RuntimeError. I'm working with NumPy arrays and the like, not text, so I'm also guessing it's something more than character encoding.deepyaman
In your particular case, any reason you can't replace the GroupByKey and AggregateGroups steps with beam.CombinePerKey(sum))? wordcount_minimal follows this pattern. I'm not sure if it will make your issue go away, but it would simplify your pipeline and avoid an explicit GroupByKey, if that's an issue. Alternatively, you could use Count.PerElement and essentially get rid of the PairWithOne step, too (you'd need to do the string combining, though, if you need it with that format).deepyaman
@user1093967, I have no idea why, but replacing GroupByKey and AggregateGroups with CombinePerKey(sum) made the problem disappear! So thank you for the tip :)Hannon Queiroz
@user1093967, did you make sure to add NumPy as a dependency that has to be installed on the workers? I'm just guessing here, but I've had a problem before that the dependency was installed in my lib folder, but not declared in the required_packages in my setup.py.Hannon Queiroz
I haven't explicitly specified it as a dependency, but I don't think that's the issue, because I'm still able to run transforms that depend on NumPy on Dataflow.deepyaman

3 Answers

2
votes

This isn't so much of a fix to my problem as it is avoiding the problem in the first place, but it did make my code run, thanks to the suggestion of user1093967 in the comments.

I just replaced the GroupByKey and the AggregateGroups by a CombinePerKey(sum) step and the problem didn't occur anymore.

p_collection
        | "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
        | "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

I'd be happy to hear why it works, though.

1
votes

In some cases, like my own, you need the intermediate grouped values, so CombinePerKey isn't ideal. In this more general case, you can replace GroupByKey() with CombineValues(ToListCombineFn()).

I'm not confident as to why this works while GroupByKey doesn't. My guess is that consuming the _UnwindowedValues iterable returned by GroupByKey like a list fails in a parallel execution environment. I was doing something like:

... | beam.GroupByKey()
    | beam.Map(lambda k_v: (k_v[0], foo(list(k_v[1]))))
    | ...

where foo requires the full, indexable list and is not easily composable. I'm not sure why this sort of restriction would have caused issues for you, though; sum can operate on an iterable.

This solution isn't ideal in that (I believe) you lose some parallelization with the ToList conversion. That being said, at least it's an option if anybody else faces this same issue!

0
votes

GroupByKey groups all elements with the same key and produces multiple PCollections. The next stage receives an Iterable collecting all elements with the same key. The important note is that this Iterable is evaluated lazily, at least when GroupByKey is executed on the Dataflow runner. This means that elements are loaded into memory on demand — when requested from the iterator.

CombinePerKey on the other hand, also groups all elements with the same key, but does an aggregation before emitting a single value.

pcollection_obj
        | "MapWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
        | "GroupByKeyAndSum" >> beam.CombinePerKey(sum)
        | "CreateDictionary" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})

@Hannon César I hope this answers your question. Cheers !!