1
votes

I am trying to leverage parallelism to speed up a Top-10 window operation. My application consists of events that have a timestamp and a key and (i.e., Tuple2<Long,String>) and my goal is to produce the Top-10 most frequent keys for tumbling windows of 30 minutes(using event-time). To this end, my query consists of an ingress, a window, and an aggregation stage. In other words, my code will need to do something like the following:

DataStream<Tuple3<Long, String, Integer>> s = env
    .readTextFile("data.csv")
    .map(new MapFunction<String, Tuple3<Long, String, Integer>>() {
      @Override
      public Tuple3<Long, String, Integer> map(String s) throws Exception {
        String[] tokens = s.split(",");
        return new Tuple3<Long, String, Integer>(Long.parseLong(tokens[0]),
            tokens[1], 1);
      }})
    .assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {
          @Override
          public long extractAscendingTimestamp(Tuple3<Long, String, Integer> t) {
            return t.f0;
          }}).setParallelism(1);

The above is the code to parse data from a CSV file and assign event time (i.e., Ingress). The reason that I set parallelism to 1 is because I need the events to appear ordered so that I can assign them to windows.

The tricky part comes next, in which I try to make speed up execution while producing correct (and ordered) window results.

Naive (Serial) Execution

The following code presents a solution that does not make use of any parallelism and produces a serial stream:

DataStream<Tuple2<Long, String>> windowedTopTen = s
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(30)))
        .apply(new SerialAggregation()).setParallelism(1);

where SerialAggregation extends RichAllWindowFunction<Tuple3<Long, String, Integer>, Tuple2<Long, String>, TimeWindow> and for each tumbling window a Tuple2<Long, String> (Long is the timestamp and String contains the top 10 keys).

The naive approach produces the correct result and the resulting data stream is ordered with ascending timestamps. Unfortunately, it does not leverage multi-threading and as a result when the input data are some GBs, the execution takes a while to complete.

Parallel (Faster) Approach

After looking into Flink's documentation on windows, I am trying to come up with a smarter way for implementing the Top 10 example by using parallelism > 1 and at the same time produce the correct result for each window. Therefore, I see that I need to transform s to a KeyedStream and then apply a window() transformation. In essence:

DataStream<Tuple2<Long, String>> windowedTopTen = s
    .keyBy(1)
    .window(TumblingEventTimeWindows.of(Time.minutes(30)))
    .apply(new PartialAggregation()).setParallelism(N);

Where PartialAggregation() would produce partial results (of disjoint key sets) for different timestamps. In other words, my understanding is that for the same timestamp t1 I will end up with partial_result_1 to partial_result_N where N is the parallelism that I have set. My goal is to aggregate all partial results for a particular timestamp (like t1), but I do not know how to do that. Also, when I am able to combine partial results with matching timestamps, how will I be able to produce a datastream, whose tuples are ordered based on the timestamps (like the result that the Naive Solution produces).

Questions

  1. How can I complete the Parallel (Faster) approach to produce the desired result and combine partial results with matching timestamps?
  2. After I combine partial results for each timestamp, is there a way to produce a data stream in which results appear ordered based on timestamps?
1

1 Answers

2
votes

First of all, it's going to be easier to combine the partial top 10 results into the overall top 10 if you replace your Tuple2 with a Tuple3 where the String is a single key, and the Integer is the counter.

Then you can tack on a second layer of windowing using windowAll and an aggregating window function that keeps the top 10 keys (overall), and their counts.