1
votes

I am building a Dataflow pipeline using Python SDK 2.15.0. In this pipeline, I need to join additional data to each element at several stages in my pipeline.

All of this additional data is read from avro files on Google Cloud Storage (same zone used for both Dataflow and GCS bucket), organized as key value tuples by using the map function and then passed as side input to the DoFn using pvalue.AsDict(). The side input data will not change during execution time of the pipeline.

The first join (side input size ~ 1 MB) goes really well. However, the second join really suffers from bad performance. Its sideinput is about 50 MB of size.

The dataflow execution graph clearly shows what's causing the bad performance: Roughly 90% of the time consumed by my ParDo step is spent for reading the side input. The amount of data read from sideinput exceeds its actual size by orders of magnitude even though I only use four worker nodes.

Is there anything I can do to prevent this? Do I need to configure worker cache size somehow? Would it be better to prepare the additional data in the setup method of my DoFn instead of passing it as sideinput?

Here is how I prepare the side inputs:

sideinput_1 = pvalue.AsDict(p | "Read side input data 1" >> beam.io.ReadFromAvro("gs:/bucket/small_file.avro",0,False,True) \
                              | "Prepare sideinput 1" >> beam.Map(lambda x: (x["KEY"],x["VALUE"])))

# Preparing data for later join
sideinput_2 = pvalue.AsDict(p | "Read side input data 2" >> beam.io.ReadFromAvro("gs://bucket/bigger_file.avro",0,False,True) \
                              | "Prepare side input data 2" >> beam.Map(lambda x: ((x["KEYCOL1"],x["KEYCOL2"],x["KEYCOL3"]),x)))

Using the sideinputs:


matching = p | "Read address data" >> beam.io.Read(beam.io.BigQuerySource(query=sql_addr, use_standard_sql=True)) \
                 | "Join w/ sideinput1" >> beam.ParDo(Join1(), sideinput_1 ).with_outputs('unmatched', main='matched')                                                                                

result = matching["matched"] | "Join Sideinput 2" >> beam.ParDo(Join2(), sideinput_2 )

DoFn process method just contain the lookup of the key in the side input and based on whether there's a match, adding some additional data to the element.

1
Ok, seems to get way better once I pass the dict as list using pvalue.AsList(). According to source code documentation (beam.apache.org/releases/pydoc/2.4.0/…) , AsList forces materialization of the side input. Seems like AsDict does not. Is there any way to force materialization for dictionaries as well? For being able to pass the dict as list, I wrapped it in a list which does only contain one element - the dictionary. Don't like this solution, but its performance is really much better. - Thomas W.

1 Answers

3
votes

Ok, some month and discussions later, with the experience gained, let me give another shot at it:

I am pretty sure that the performance issue with the side input comes down to a memory swapping problem. In the pipeline, there are some other joins which are pretty similar, but have significantly smaller side inputs. They run through with reasonable wall time. However, the ratio (IO bytes / sideinput bytes) is roughly equal for all of those joins.

The performance of the affected join improved by orders of magnitude when I switched implementation from ParDo with SideInput to CoGroupByKey Transform.

One more word about the size of a side input and when to prefer CoGroupByKey over DoFn with SideInput:

The great blog entry "Guide to common Cloud Dataflow use-case patterns" states one might use ParDo for SideInputs of up to 100 MB in streaming and 1 GB in batch mode:

Note: If possible, use SideInputs for any activity where one of the join tables is actually small — around 100MB in stream mode or under a 1GB in batch mode. This will perform much better [...].

I guess there is no general threshold which fits each and every scenario. Might depend heavily on your pipeline, the machine type and number of workers and so forth. In my case, I suppose the threshold is lower due to high complexity of the pipeline. It consists of ~ 40 transforms including several joins.

So if you run into the same issue when doing a join with ParDo and Sideinput, you might want to give CoGroupByKey-Transform a try.