2
votes

I have a WordCount program running in a 4-worknodes Flink cluster which reads data from a Kafka topic.

In this topic, there are lot of pre-loaded texts(words). The words in the topic satisfy Zipf distribution. The topic has 16 partitions. Each partition has around 700 M data inside.

There is one node which is much slower than the others. As you can see in the picture, worker2 is the slower node. But the slower node is not always worker2. From my tests, it is also possible that worker3 or other nodes in the cluster can also be slower.

But, there is always such a slow worker node in the cluster. In the cluster, each worker node has 4 task slots, thus 16 task slots in total.

After sometime, Records sent to other worker nodes (except for the slower node) will not increase any more. The records sent to slower node will increase to the same level of others and the speed is much faster.

Is there anyone who can explain why the situation occurs ? Also, What am I doing wrong in my setup ?

enter image description here enter image description here

Here is the throughput(count by words at Keyed Reduce -> Sink stage) of the cluster.

enter image description here

From this picture we could see that the throughput of the slower node - node2 is much higher than that of the others. It means that node2 received more records from the first stage. I think this would be because of the Zipf distribution of the words in the topic. The words with very high frequency are mapped to node2.

When nodes spend more compute resources on the Keyed Reduce -> Sink stage, the speed of reading data from Kafka decreases. When all the data in partitions corresponding to node1, node3 and node4 are processed, the throughput of the cluster drops down.

1
In this case, there is no difference between partitions of the kafka topic. The reason is the skewed data in the whole topic. The cluster has 4 work nodes and each work nodes has 4 task slots. It seems that all slots in worker node2 receive more data that slots in other nodes. What if we switch two slots of node2 with node1? What I mean is even we couldn’t balance workload of slots, but we still could balance workload of nodes as each node has multi task slots. Then all partitions of the topic would be consumed with similar speed.Jun

1 Answers

1
votes

As your data follows a Zipf distribution, this behavior is expected. Some workers just receive more data due to the in-balance in the distribution itself. You would observe this behavior in other systems, too.