I decided to give it a go using a custom CombineFn
function to determine the minimum and maximum per each key. Then, join them with the input data using CoGroupByKey
and apply the desired mapping to normalize the values.
"""Normalize PCollection values."""
import logging
import argparse
import sys
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class MinMaxFn(beam.CombineFn):
def create_accumulator(self):
return (sys.maxint, 0)
def add_input(self, min_max, input):
(current_min, current_max) = min_max
return min(current_min, input), max(current_max, input)
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, min_max):
return min_max
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)
pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]
minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())
merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()
normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))
normalized | 'Write results' >> WriteToText(known_args.output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
The snippet can be run with python SCRIPT_NAME.py --output OUTPUT_FILENAME
. My test data, grouped by key, is:
('foo', [1, 5])
('bar', [5, 9, 2])
The CombineFn will return per key min and max:
('foo', [(1, 5)])
('bar', [(2, 9)])
The output of the join/cogroup by key operation:
('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))
And after normalizing:
('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])
This was just a simple test so I’m sure it can be optimized for the mentioned volume of data but it seems to work as a starting point. Take into account that further considerations might be needed (i.e. avoid dividing by zero if min = max)