I am building a Dataflow pipeline using Python SDK 2.15.0. In this pipeline, I need to join additional data to each element at several stages in my pipeline.
All of this additional data is read from avro files on Google Cloud Storage (same zone used for both Dataflow and GCS bucket), organized as key value tuples by using the map function and then passed as side input to the DoFn using pvalue.AsDict(). The side input data will not change during execution time of the pipeline.
The first join (side input size ~ 1 MB) goes really well. However, the second join really suffers from bad performance. Its sideinput is about 50 MB of size.
The dataflow execution graph clearly shows what's causing the bad performance: Roughly 90% of the time consumed by my ParDo step is spent for reading the side input. The amount of data read from sideinput exceeds its actual size by orders of magnitude even though I only use four worker nodes.
Is there anything I can do to prevent this? Do I need to configure worker cache size somehow? Would it be better to prepare the additional data in the setup method of my DoFn instead of passing it as sideinput?
Here is how I prepare the side inputs:
sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
| "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))
# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
| "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))
Using the sideinputs:
matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
| "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')
result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )
DoFn process method just contain the lookup of the key in the side input and based on whether there's a match, adding some additional data to the element.