Let me preface everything by first stating I am just getting to know my way around working with Beam's Python SDK and GCP Dataflow!
The issue: My pipeline has been working great for almost all use cases. No errors that I can complain about. I just have some questions about some possible bottlenecks or optimizations I could make. I have noticed that my pipeline execution times jump to almost over 3 hours when working with a gzipped file of size ~50mb. Not entirely sure if there is any way to speed up that part. Below is a screenshot of the log warnings I see a bunch before the job eventually successfully completes.
Log Output details from Dataflow
Here is a relevant snippet of the pipeline:
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Beam Pipeline: example'
)
parser.add_argument('--input-glob',
help='Cloud Path To Raw Data',
required=True)
parser.add_argument('--output-bq-table',
help='BQ Native {dataset}.{table}')
known_args, pipeline_args = parser.parse_known_args()
with beam.Pipeline(argv=pipeline_args) as p:
json_raw = (p | 'READING RAW DATA' >> beam.io.ReadFromText(known_args.input_glob)
| 'JSON-ing' >> beam.Map(lambda e: json.loads(e))
)
Extra info:
- I am using Airflow's Dataflowhook to launch this.
I've played around with different machine types hoping that throwing more compute power at the problem would make it go away, but so far no luck.
The following pipeline execution params:
--runner=DataflowRunner \
--project=example-project \
--region=us-east4 \
--save_main_session \
--temp_location=gs://example-bucket/temp/ \
--input-glob=gs://example-bucket/raw/ \
--machine_type=n1-standard-16 \
--job_name=this-is-a-test-insert