Here's the case, we have 3 kafka topic (each has 50 partitions), they have different messages, while all these messages have field 'username',
topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute
And we've defined a wrapper class,
MessageWrapper{
List<Message01> list01;
List<Message02> list02;
List<Message03> list03;
}
We have a flatMap, which 'convert' original message to a tuple3,
String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object
All 3 streams are processed by similar flatMap() function,
public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
throws Exception {
String name = value.getUsername();
if (!StringUtils.isBlank(name)) {
MessageWrapper wrapper = new MessageWrapper();
List<Message01> list = new ArrayList<>();
list.add(value);
wrapper.setList01(list);
out.collect(new Tuple3<>(name, 1, wrapper));
}
}
After the flatMap(), we union these 3 streams,
stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
.process(
new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {
@Override
public void process(Tuple key,
ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
Collector<MessageWrapper> out) throws Exception {
// merge all entities which have same username, to get a big fat wrapper object
MessageWrapper w = new MessageWrapper();
for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
MessageWrapper ret = t3.f2;
Integer type = t3.f1;
if (type == 1) {
// add to list01
} else if (type == 2) {
// add to list02
} else if (type == 3) {
// add to list03
}
}
if (all 3 lists are not empty) {
out.collect(ret);
}
}
});
Currently we use 20 taskmanager, each 4 cores + 16G, totally 80 slots, we use 50 parallelism through.
We always run into problem that taskmanager not responding due to too much full gc,
Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".
If we reduce the time window from 5 minutes to 1 minutes, everything is fine. According to this, seems like the flink cluster doesn't have enough resource, but 80 cores + 320G for just several millions messages (the size of each message is about 5KB), should be enough right?
Anyone could shed some light here? Or probably there're some problems in the code?