0
votes

I am using apache beam via python SDK and have the following problem:

I have a PCollection with approximately 1 mln entries, each entry in a PCollection looks like a list of 2-tuples [(key1,value1),(key2,value2),...] with length ~150. I need to find max and min values across all entries of the PCollection for each key in order normalize the values.

Ideally, it will be good to obtain PCollection with a list of tuples [(key,max_value,min_value),...] and then it will be easy to proceed with normalization to get [(key1,norm_value1),(key2,norm_value2),...], where norm_value = (value - min) / (max - min)

At the moment I can do it only separately for each key by hands, which is not very convenient and not sustainable, so any suggestions will be helpful.

1
Are your keys unique across entries? Is it possible for a key to occur in different entries (notice "key1" repeating in Entry1 and Entry2), i.e. {Entry1: [(key1, value1), (key2, value)], Entry2: [(key1, value3), (key3, value4)]} ā€“ Anupam Saini
@AnupamSaini There is a predefined list of keys [key1,key2, ..., keyn] where n ~ 150 and all entries contain all the keys from this list, only the values are different for each entry. So the idea of the question is to find min and max values corresponding to each of these 150 keys across all the entries. ā€“ Philipp

1 Answers

4
votes

I decided to give it a go using a custom CombineFn function to determine the minimum and maximum per each key. Then, join them with the input data using CoGroupByKey and apply the desired mapping to normalize the values.

"""Normalize PCollection values."""

import logging
import argparse
import sys

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


# custom CombineFn that outputs min and max value
class MinMaxFn(beam.CombineFn):
  # initialize min and max values (I assumed int type)
  def create_accumulator(self):
    return (sys.maxint, 0)

  # update if current value is a new min or max      
  def add_input(self, min_max, input):
    (current_min, current_max) = min_max
    return min(current_min, input), max(current_max, input)

  def merge_accumulators(self, accumulators):
    return accumulators

  def extract_output(self, min_max):
    return min_max


def run(argv=None):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument('--output',
                      dest='output',
                      required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  p = beam.Pipeline(options=pipeline_options)

  # create test data
  pc = [('foo', 1), ('bar', 5), ('foo', 5), ('bar', 9), ('bar', 2)]

  # first run through data to apply custom combineFn and determine min/max per key
  minmax = pc | 'Determine Min Max' >> beam.CombinePerKey(MinMaxFn())

  # group input data by key and append corresponding min and max 
  merged = (pc, minmax) | 'Join Pcollections' >> beam.CoGroupByKey()

  # apply mapping to normalize values according to 'norm_value = (value - min) / (max - min)'
  normalized = merged | 'Normalize values' >> beam.Map(lambda (a, (b, c)): (a, [float(val - c[0][0][0])/(c[0][0][1] -c[0][0][0]) for val in b]))

  # write results to output file
  normalized | 'Write results' >> WriteToText(known_args.output)

  result = p.run()
  result.wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The snippet can be run with python SCRIPT_NAME.py --output OUTPUT_FILENAME. My test data, grouped by key, is:

('foo', [1, 5])
('bar', [5, 9, 2])

The CombineFn will return per key min and max:

('foo', [(1, 5)])
('bar', [(2, 9)])

The output of the join/cogroup by key operation:

('foo', ([1, 5], [[(1, 5)]]))
('bar', ([5, 9, 2], [[(2, 9)]]))

And after normalizing:

('foo', [0.0, 1.0])
('bar', [0.42857142857142855, 1.0, 0.0])

This was just a simple test so Iā€™m sure it can be optimized for the mentioned volume of data but it seems to work as a starting point. Take into account that further considerations might be needed (i.e. avoid dividing by zero if min = max)