0
votes

we are using kafka to implement a driven event application, and have a lot of rebalancing between the consumers in the group.

we are polling max 100 event each time and the event processing takes 2-10 minutes. we keep TTL for each message and after a while most of the messages are expired (it takes more than 1.5 hour until the message is being consumed) currently we have about 10000 messages to this topic in one hour and 3 consumers. The behavior that we see is that while producing 10000 in one hour we consume 25000 messages in this time, the same message is being consumed by more than one consumer. we are using the default commit strategy.

we get a lot of :

failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

I found:

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

and I want to try it but I dont know how to configure the group.max.session.timeout.ms in the broker.

I also found:

Why can't I increase session.timeout.ms?

how can I get the ConsumerConfig content as described in this question?

Thanks, Eilon

1

1 Answers

0
votes

I'm not sure about the way you launch your broker and clients, but hoping that you launch em via console using the scripts on kafka's bin folder, these would be the steps you should follow:

  • Broker side

You should increase the group.max.session.timeout.ms parameter, for example, to the double (??). This param defines, as stated in the latest version of the documentation, The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures. When you launch a Kafka broker, the simplest command is as follows:

bin/kafka-server-start.sh config/server.properties

You must modify that config/server.properties file, by adding the param. Just for the example (better locate it further down..):

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker. 
broker.id=0

###group session timeout! yep, this one
group.max.session.timeout.ms=3600000  //(default is 1800000)

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from

...

Relaunch Kafka and you're done on the broker side.

  • Consumer side

As you already know, now it's time to modify the session.timeout.ms param on the client side. This param defines (and this is a long one):

The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

When you launch the consumer , the simplest command is as follows:

bin/kafka-console-consumer.sh --consumer.config config/myconsumer.properties

In your consumer properties file, you should add/modify the param. For example:

(...)
##Consumer session timeout! 
session.timeout.ms=20000 //(default is 10000)
(...)

Normally the Consumer API is launched by a client( java, ... ) and the properties are loaded by reading the program's launch arguments. Relaunch the consumer(s) and you're done.

Maybe off topic, but out of kafka's configuration tunning, you could also check if you have the possibility to deocuple consuming and processing (for example, by using some kind of resource poll).

Hope it helps!