2
votes

I'm using Flink to process stream data from Kafka. The flow is pretty basic, consuming from Kafka, data enrichment, and then sink to FS.

In my case, the number of partitions is bigger than Flink parallelism level. I've noticed that Flink does not consume evenly from all partitions.

Once in a while, lags are being created in some Kafka partitions. Restarting the app helps Flink to "rebalance" the consuming and the lags closes fast. However, after a while, I see lags in other partitions and so on.

Seeing this behavior, I tried to rebalance the consuming rate by using rebalance() as suggested in Flink documentation:

"Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew."

dataStream.rebalance();

Change in code was minor, simply add rebalance() to the data stream source. Running the app with rebalance() caused a very weird behavior from Flink:

I set the parallelism level to 260 and submitted a job, but for some reason, the job manager multiplied the number of slots by 4. Looking at the execution plan chart, I realized that now all data is consumed by 260 cores, and then it is being sent to 3 sinks (hopefully evenly). The jobs failed due to a lack of resources.

enter image description here

enter image description here

Since I wanted to use 260 cores I tried to submit the job again, this time with a parallelism level of 65 (=260/4). The job runs fine, but the processing rate is low. In the web UI, I discovered that the total number of slots does not equal available task slots + running tasks. But if I refer to rtbJsonRequest (the job I submitted) as a job with 65 (=260/4) tasks slot, instead of 260 as it's written, it equals.

enter image description here Long story short, I am trying to find a way to balance the consumption over Kafka partition. According to Flink documentation rebalance() is what I need, but apparently I am using it wrong.

Adding more inputs. There are 520 partitions in the topic and the parallelism level is 260 (each core has 2 partitions).

I can see clearly that few partitions have a very low consumption rate: enter image description here

3
Are the partitions themselves balanced? I.e. how is the data split between partitions ?Dominik Wosiński
Yes, the incoming rate by partitions looks normal and evenly.EyalP

3 Answers

1
votes

Inserting a rebalance after the sources will not balance the sources themselves, but rather it will balance the inputs to what follows, by inserting a round-robin network shuffle into the job graph. The most this can accomplish is to even out the load on the sinks, which doesn't help with your issue.

How many Kafka partitions are you consuming from, in total? Are you using topic or partition discovery? Is does seem odd that restarting the job is helpful.

1
votes

I discovered that 2 of my Flink task managers have a very low processing rate comparing to the other workers.

As you can see in the screenshot below, less than 5K events per second while other processing at least 37K:

enter image description here

This really helped me to understand that I am having an environmental issue and not a Flink one. In my case, installing a CPU governer and reboot the machine resolved the problem.

A very important thing I learned during the process, by default Flink does not discover Kafka partitions. If you wish to add it, simply add to your properties:

"flink.partition-discovery.interval-millis", "time_interval"

0
votes
Properties properties = new Properties();
properties.setProperty("group.id", consumerGroup);
properties.setProperty("auto.offset.reset", autoOffsetReset);
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty(
    "flink.partition-discovery.interval-millis", "30000");

I added partition discovery to the properties and the job throws an NPE.Is this the correct way to set the partition discovery property ?

java.lang.NullPointerException: null
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.getAllPartitionsForTopics(Kafka09PartitionDiscoverer.java:77)