I have an Apache Beam pipeline running on Google Dataflow, that reads gzip-compressed JSON data from GCS, transforms them, and loads them into Google BigQuery. The pipeline works as expected on a sample batch of data, but when I try to run it over the whole data (~2.4M files), it sometimes raises a confusing error that cracks the process after a few occurrences.
The error is:
Error message from worker: Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py", line 380, in process source = list(source.split(float('inf')))[0].source IndexError: list index out of range During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 647, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1030, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1122, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1030, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1122, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 553, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsource.py", line 380, in process source = list(source.split(float('inf')))[0].source IndexError: list index out of range [while running 'GetData/ReadAllFiles/ReadRange']
I understand that the stage involved in the error is GetData
:
files = (p
| 'Init' >> beam.Create([files_pattern])
| 'GetData' >> ReadAllFromText())
Where p
is the pipeline object and files_pattern
is a glob of the form gs://{bucket}/{prefix}/*.json.gz
.
The error is confusing to me because it says nothing about the pipeline itself, and remember the fact that it works for the sample batch.
My process is very similar to the one in Avoid recomputing size of all Cloud Storage files in Beam Python SDK. I checked the mentioned resources there, but I still can't manage my error. What am I missing here? I couldn't find any resources about this error in specific.
min_bundle_size=1
, what should I expect by changing the default? – osjerick