2
votes

I have an Apache Beam pipeline running on Google Dataflow, that reads gzip-compressed JSON data from GCS, transforms them, and loads them into Google BigQuery. The pipeline works as expected on a sample batch of data, but when I try to run it over the whole data (~2.4M files), it sometimes raises a confusing error that cracks the process after a few occurrences.

The error is:

Error message from worker: Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py", line 380, in process source = list(source.split(float('inf')))[0].source IndexError: list index out of range During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 647, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1030, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1122, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1030, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1122, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py", line 380, in process source = list(source.split(float('inf')))[0].source IndexError: list index out of range [while running 'GetData/ReadAllFiles/ReadRange']

I understand that the stage involved in the error is GetData:

files = (p
         | 'Init' >> beam.Create([files_pattern])
         | 'GetData' >> ReadAllFromText())

Where p is the pipeline object and files_pattern is a glob of the form gs://{bucket}/{prefix}/*.json.gz.

The error is confusing to me because it says nothing about the pipeline itself, and remember the fact that it works for the sample batch.

My process is very similar to the one in Avoid recomputing size of all Cloud Storage files in Beam Python SDK. I checked the mentioned resources there, but I still can't manage my error. What am I missing here? I couldn't find any resources about this error in specific.

1
This looks like it might be a bug in filebasedsource. Two questions: first, do you have any empty files? Second, can you try setting min_bundle_size=1 on your ReadAllFromText setting?danielm
Yes, I checked that there are some empty files. I could try with min_bundle_size=1, what should I expect by changing the default?osjerick
The problem here looks like bug in ReadAllFromText. The issue is that when it splits the work up across the workers, it will crash if one of those splits is empty. min_bundle_size=1 should prevent that from happening, but not otherwise impact the job muchdanielm
Ok. I'll try with that value then and be back with news.osjerick
Instructions are here - cwiki.apache.org/confluence/display/BEAM/Python+Tips. This implementation is shared by all runners so you should be able to reproduce with DirectRunner (writing to GCS) as well.chamikara

1 Answers

2
votes

I recommend to use Dataflow Shuffle. Instead of having the shuffle data in the VM's persistent disks, the data is kept in Dataflow backend. As the error message mentions the index being out of range and, from the comments, the job ran using less data what happens is that you run out of memory or storage.

Furthermore, keep in mind that you can use Dataflow Google provided templates. Keep in mind that these templates are in Java rather than Python. One of the templates is GCS Text to BigQuery.