0
votes

I am running into a problem using Apache Beam side input, in particular AsSingleton. The code is very simple. I am trying to filter a list of orders to retain only those that are above average, so I calculate average_order_amount and use it as a side input to a Filter operation on the main 'orders' pcollection. However I get an error message "TypeError: '>' not supported between instances of 'float' and 'AsSingleton'". It appears that the side input value cannot be compared with any type, here float. Also, trying to type cast the singleton to float does not work. To reproduce the error I have this code (the error is on comparing the order amount ord2[1] with the side input):

import apache_beam as beam
p = beam.Pipeline()
orders = (p | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]) )
average_order_amount = orders | 'Extract order amount' >>  beam.Map(lambda ord: float(ord[1])) | 'Find mean order amount' >> beam.combiners.Mean.Globally()
above_average_orders = orders | 'Keep only high orders' >> beam.Filter(lambda ord2: ord2[1] > beam.pvalue.AsSingleton(average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()
1

1 Answers

0
votes

In this case, the AsSingleton object should be passed to your Filter in this way:

import apache_beam as beam
p = beam.Pipeline()
orders = (p 
          | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]))
average_order_amount = beam.pvalue.AsSingleton(
    orders 
    | 'Extract order amount' >>  beam.Map(lambda ord: float(ord[1])) 
    | 'Find mean order amount' >> beam.combiners.Mean.Globally())

above_average_orders = (
    orders 
    | 'Keep only high orders' >> beam.Filter(lambda ord2, avg: ord2[1] > avg, 
                                             average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()

You pass the AsSingleton object as a second argument for your Filter function, and add an extra argument for your lambda, which Beam will fill in appropriately.

This is the same for other transforms, such as Map, FlatMap, ParDo, etc.