0
votes


So, i'm facing this seems-to-be-classic-problem, extract timeframed toppers for unbounded stream,
using Apache Beam (Flink as the engine):

Assuming sites+hits tuples input:
{"aaa.com", 1001}, {"bbb.com", 21}, {"aaa.com", 1002}, {"ccc.com", 3001}, {"bbb.com", 22} ....
(Expected rate: +100K entries per hour)

Goal is to output sites which are >1% of total hits, in each 1 hour timeframe.
i.e. for 1 hour fix window, pick the site that sums >1% hits out total hits.

So first, sum by key:
{"aaa.com", 2003}, {"bbb.com", 43}, {"ccc.com", 3001} ....
And finally output the >1%:
{"aaa.com"}, {"ccc.com"}

Alternative:

1) Group + parDo:
Fixed time windowing 1 hour, group all elements, following by iterable parDo for all window elements,
calculate sum and output the >1% sites.
Cons seems to be all agg process done single thread and also seems require double iterations to get the sum and get >1%.

2) GroupByKey + Combine
Fixed time windowing 1 hour, GrouByKey using key=Site, applying Combine with custom accumulator to sum hits per key.

Although the Combine option(#2) seems more suitable,
i'm missing the part of getting in the sum-per-1-hour-window, needed to calculate the >%1 elements:
Can same window be used for 2 combines: one per key and one total hits sum in this window?
and what is the best approach to combine them both to make the >1% call per element?

10x

1

1 Answers

0
votes

You can do this via side inputs. For instance, you'd do something like this (code in Python, but answer for Java is similar):


input_data = .... # ("aaa.com", 1001), ("bbb.com", 21), ("aaa.com", 1002), ("ccc.com", 3001), ("bbb.com", 22) ....

total_per_key = input_data | beam.CombinePerKey(sum)

global_sum_per_window = beam.pvalue.AsSingleton(
    input_data
    | beam.Values()
    | beam.CombineGlobally(sum).without_defaults())

def find_more_than_1pct(elem, global_sum):
  key, value = elem
  if value > global_sum * 0.01:
    yield elem

over_1_pct_keys = total_per_key | beam.FlatMap(find_more_than_1pct)

In this case, the global_sum_per_window PCollection will have one value for each window, and the total_per_key will have one value per-key-per-window.

Let me know if that works!