1
votes

Here's the case, we have 3 kafka topic (each has 50 partitions), they have different messages, while all these messages have field 'username',

topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute

And we've defined a wrapper class,

MessageWrapper{
 List<Message01> list01;
 List<Message02> list02;
 List<Message03> list03;
}

We have a flatMap, which 'convert' original message to a tuple3,

String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object

All 3 streams are processed by similar flatMap() function,

public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
        throws Exception {
    String name = value.getUsername();
    if (!StringUtils.isBlank(name)) {
        MessageWrapper wrapper = new MessageWrapper();
        List<Message01> list = new ArrayList<>();
        list.add(value);
        wrapper.setList01(list);
        out.collect(new Tuple3<>(name, 1, wrapper));
    }
}

After the flatMap(), we union these 3 streams,

stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
        .process(
                new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {

                    @Override
                    public void process(Tuple key,
                            ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
                            Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
                            Collector<MessageWrapper> out) throws Exception {
                        // merge all entities which have same username, to get a big fat wrapper object
                        MessageWrapper w = new MessageWrapper();
                        for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
                            MessageWrapper ret = t3.f2;
                            Integer type = t3.f1;
                            if (type == 1) {
                                // add to list01
                            } else if (type == 2) {
                                // add to list02
                            } else if (type == 3) {
                                // add to list03
                            }
                        }

                        if (all 3 lists are not empty) {
                            out.collect(ret);
                        }
                    }
                });

Currently we use 20 taskmanager, each 4 cores + 16G, totally 80 slots, we use 50 parallelism through.

We always run into problem that taskmanager not responding due to too much full gc,

Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".

If we reduce the time window from 5 minutes to 1 minutes, everything is fine. According to this, seems like the flink cluster doesn't have enough resource, but 80 cores + 320G for just several millions messages (the size of each message is about 5KB), should be enough right?

Anyone could shed some light here? Or probably there're some problems in the code?

1
When does this actually happend. Right after the start? Or when the first window is processed? According to your specs it seems like each task manager has a parallism of 4, right (20x4=80)? That means per slot you only have 4GB and one core which is not that much any more. Maybe decreasing the task manager parallism helps.TobiSH
@TobiSH Normally, this happens several minutes after the start, it could be 3minutes or 10minutes, each time is different.gfytd
Do you have any insights if the data is unbalanced? Some users could have Millions of events which you try to keep in memory? If not: Instead of aggregating them you could just count them to see if this might be the problemTobiSH
What implementation for the List<Message01> in your MessageWrapper do you use? It should be one which is optimized for inserting.TobiSH
@TobiSH ArrayListgfytd

1 Answers

0
votes

I solved this problem on my cluster setup by commenting the line with 127.0.1.1 on the /etc/hosts file of all machines. And I increased the parallelism of slots on the property taskmanager.numberOfTaskSlots: of the conf/flink-conf.yaml file.