0
votes

I have written a small test case code in Flink to sort a datastream. The code is as follows:

public enum StreamSortTest {
    ;
    public static class MyProcessWindowFunction extends ProcessWindowFunction<Long,Long,Integer, TimeWindow> {
        @Override
        public void process(Integer key, Context ctx, Iterable<Long> input, Collector<Long> out) {
            List<Long> sortedList = new ArrayList<>();
            for(Long i: input){
                sortedList.add(i);
            }
            Collections.sort(sortedList);
            sortedList.forEach(l -> out.collect(l));
        }
    }

    public static void main(final String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);

        DataStream<Long> probeSource = env.fromSequence(1, 500).setParallelism(2);

        // range partition the stream into two parts based on data value
        DataStream<Long> sortOutput =
                probeSource
                        .keyBy(x->{
                            if(x<250){
                                return 1;
                            } else {
                                return 2;
                            }
                        })
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
                        .process(new MyProcessWindowFunction())
                        ;

        sortOutput.print();
        System.out.println(env.getExecutionPlan());
        env.executeAsync();
    }
}

However, the code just outputs the execution plan and a few other lines. But it doesn't output the actual sorted numbers. What am I doing wrong?

1
Do You run this locally ?Dominik Wosiński
@Dominik.. Yes. It is run locally in intellijAvinashK

1 Answers

0
votes

The main problem I can see is that You are using ProcessingTime based window with very short input data, which surely will be processed in time shorter than 20 seconds. While Flink is able to detect end of input(in case of stream from file or sequence as in Your case) and generate Long.Max watermark, which will close all open event time based windows and fire all event time based timers. It doesn't do the same thing for ProcessingTime based computations, so in Your case You need to assert Yourself that Flink will actually work long enough so that Your window is closed or refer to custom trigger/different time characteristic.

One other thing I am not sure about since I never used it that much is if You should use executeAsync for local execution, since that's basically meant for situations when You don't want to wait for the result of the job according to the docs here.