I'm using Flink to process stream data from Kafka. The flow is pretty basic, consuming from Kafka, data enrichment, and then sink to FS.
In my case, the number of partitions is bigger than Flink parallelism level. I've noticed that Flink does not consume evenly from all partitions.
Once in a while, lags are being created in some Kafka partitions. Restarting the app helps Flink to "rebalance" the consuming and the lags closes fast. However, after a while, I see lags in other partitions and so on.
Seeing this behavior, I tried to rebalance the consuming rate by using rebalance() as suggested in Flink documentation:
"Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew."
dataStream.rebalance();
Change in code was minor, simply add rebalance() to the data stream source. Running the app with rebalance() caused a very weird behavior from Flink:
I set the parallelism level to 260 and submitted a job, but for some reason, the job manager multiplied the number of slots by 4. Looking at the execution plan chart, I realized that now all data is consumed by 260 cores, and then it is being sent to 3 sinks (hopefully evenly). The jobs failed due to a lack of resources.
Since I wanted to use 260 cores I tried to submit the job again, this time with a parallelism level of 65 (=260/4). The job runs fine, but the processing rate is low. In the web UI, I discovered that the total number of slots does not equal available task slots + running tasks. But if I refer to rtbJsonRequest (the job I submitted) as a job with 65 (=260/4) tasks slot, instead of 260 as it's written, it equals.
Long story short, I am trying to find a way to balance the consumption over Kafka partition. According to Flink documentation rebalance() is what I need, but apparently I am using it wrong.
Adding more inputs. There are 520 partitions in the topic and the parallelism level is 260 (each core has 2 partitions).
I can see clearly that few partitions have a very low consumption rate: