2
votes

I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.

The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.

The counting step was implemented following a corresponding step in the WordCount example:

def count_keywords_per_product(self, key_and_group):
    key, group = key_and_group
    count = 0
    for e in group:
        count += 1

    self.stats.product_counter.inc()
    self.stats.keywords_per_product_dist.update(count)

    return (key, count)

The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.

Please advise what might be the reason and how this can be optimized.

Current resource metrics:
Current vCPUs    126
Total vCPU time      1,753.649 vCPU hr
Current memory   472.5 GB
Total memory time    6,576.186 GB hr
Current PD   3.08 TB
Total PD time    43,841.241 GB hr
Current SSD PD   0 B
Total SSD PD time    0 GB hr
Total Shuffle data processed     1.03 TB
Billable Shuffle data processed      529.1 GB

The pipeline steps including the counting one can be seen below: enter image description here

1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.kpax
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.kpax

1 Answers

1
votes

The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.

Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.