2
votes

I would like to understand how window affect performance in Flink job, I am running a flink job consist of operator as below:

Job Parallelism (4,8,16) : [auto-gen source] --> [ Map1 ]--> [ Tumbling Window (10s)]--> [Map2]--> [Sink]

Flink windowed performance eps 4p,8p,16p

The job above performance capped at around 50k+- per second, regardless how I scale the cluster form 4 -16 parallelism.

When the job window was removed , the job can achieve 200k per second.

Job Parallelism(4-8): [auto-gen source] --> [ Map1 ] --> [Map2]--> [Sink]

Flink performance no window 4p, 8p

I have remove the Window's logic to eliminate application logic that bottleneck the performance, but seem like the window still caused my whole stream performance to goes down, even though that window just a passthrough function.

This screenshot shows that the performance for job without window, and the job with empty window enabled.

Empty window shows spike but operator slowed down

The stream was capped and couldn't further scaled . Anyway to improve the performance for such scenario?

Note: The job does not has external service dependencies and checkpoint turned off.

1
Without seeing code, it is hard to diagnose your situation. What state backend are you using? How many unique keys do you have? What is the distribution of your keys?Joshua DeWald
Hi @JoshuaDeWald, My stream doesn't has any state enabled when running this test. The job consist of 10mil unique keys, on a cluster of 3 taskmanager.coffee_latte1020
Its rather interesting because I have remove all the logic in the window in one of the test scenario, i.stack.imgur.com/dZjX8.jpg you can see the performance graph for window is spiking high when the time reached. I can't explain why would and empty window dragged down the whole stream throughput though, at least from application code wise, its quite straight forward.coffee_latte1020

1 Answers

2
votes

Further investigation seem it was caused by the keyBy() into keyedStream operation, the performance dropped nearly 40% when the stream was keyed, and if there are more than one keyedStream it continues to degrade performance .

        DataStream<Tuple2<String, MyObj>> testStream = finalizedStream
            .map(new RichMapFunction<MyObj, Tuple2<String, MyObj>>() {

                @Override
                public Tuple2<String, MyObj> map(MyObj value) throws Exception {
                    return new Tuple2<String, MyObj>(value.getCustomKey(), value);
                }
            });

    KeyedStream<Tuple2<String, MyObj>, Tuple> keyedStream = testStream.keyBy(0);
    keyedStream.map(new MapFunction<Tuple2<String, MyObj>, MyObj>() {

        @Override
        public MyObj map(Tuple2<String, MyObj> tuple) throws Exception {
            return tuple.f1;
        }
    }).name("keyed stream");

Anyway that I can make the performance better? Seem using keyed is unavoidable. And I am not sure why we had tried for scaling the job in our environment too, performance just capped at a figure regardless as many parallelism we provided. CPU utilization and Memory shown it wasn't at critical when monitor using nmon.

Any input and help will be very much appreciated.