I have a pipeline using the Python SDK 2.2.0 for Apache Beam.
This pipeline is almost a typical word count: I have pairs of names in the format ("John Doe, Jane Smith", 1), and I'm trying to figure out how many times each pair of names appears together, like this:
p_collection
| "PairWithOne" >> beam.Map(lambda pair: (', '.join(pair).encode("ascii", errors="ignore").decode(), 1))
| "GroupByKey" >> beam.GroupByKey()
| "AggregateGroups" >> beam.Map(lambda (pair, ones): (pair, sum(ones)))
| "Format" >> beam.Map(lambda element: {'pair': element[0], 'pair_count': element[1]})
When I run this code locally, with a small dataset, it works perfectly.
But when I deploy it to Google Cloud DataFlow, I get the following error:
An exception was raised when trying to execute the workitem 423109085466017585 : Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work work_executor.execute() File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start def start(self): File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.scoped_start_state: File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start with self.shuffle_source.reader() as reader: File "dataflow_worker/shuffle_operations.py", line 69, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start self.output(windowed_value) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "dataflow_worker/shuffle_operations.py", line 233, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process self.output(wvalue.with_value((k, wvalue.value))) File "apache_beam/runners/worker/operations.py", line 154, in apache_beam.runners.worker.operations.Operation.output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 415, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 86, in apache_beam.runners.worker.operations.ConsumerSet.receive cython.cast(Operation, consumer).process(windowed_value) File "apache_beam/runners/worker/operations.py", line 339, in apache_beam.runners.worker.operations.DoOperation.process with self.scoped_process_state: File "apache_beam/runners/worker/operations.py", line 340, in apache_beam.runners.worker.operations.DoOperation.process self.dofn_receiver.receive(o) File "apache_beam/runners/common.py", line 382, in apache_beam.runners.common.DoFnRunner.receive self.process(windowed_value) File "apache_beam/runners/common.py", line 390, in apache_beam.runners.common.DoFnRunner.process self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 431, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise new_exn, None, original_traceback File "apache_beam/runners/common.py", line 388, in apache_beam.runners.common.DoFnRunner.process self.do_fn_invoker.invoke_process(windowed_value) File "apache_beam/runners/common.py", line 189, in apache_beam.runners.common.SimpleInvoker.invoke_process self.output_processor.process_outputs( File "apache_beam/runners/common.py", line 480, in apache_beam.runners.common._OutputProcessor.process_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 84, in apache_beam.runners.worker.operations.ConsumerSet.receive self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 90, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 63, in apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 81, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 730, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 739, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._value_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 518, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables values[i], nested=nested or i + 1 < len(self._coder_impls))) RuntimeError: KeyError: 0 [while running 'Transform/Format']
Looking at the source code of where this error pops up from, I thought it could be cause due to the fact that some of the names contain some weird encoded characters, so in a desperate act I tried using the .encode("ascii", errors="ignore").decode() you see on the code, but no luck.
Any ideas as to why this pipeline executes successfully locally, but fails on DataFlow runner?
Thanks!
GroupByKeyandAggregateGroupssteps withbeam.CombinePerKey(sum))? wordcount_minimal follows this pattern. I'm not sure if it will make your issue go away, but it would simplify your pipeline and avoid an explicitGroupByKey, if that's an issue. Alternatively, you could useCount.PerElementand essentially get rid of thePairWithOnestep, too (you'd need to do the string combining, though, if you need it with that format). - deepyamanGroupByKeyandAggregateGroupswithCombinePerKey(sum)made the problem disappear! So thank you for the tip :) - Hannon Queirozsetup.py. - Hannon Queiroz