21
votes

I want to increase session.timeout.ms to allow longer time for processing the messages received between poll() calls. However when I change session.timeout.ms to a higher value than 30000, it fails to create Consumer object and throws below error.

Could anyone tell why can't I increase session.timeout.ms value or if I am missing something?

0    [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 

request.timeout.ms = 40000
check.crcs = true
retry.backoff.ms = 100
ssl.truststore.password = null
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 262144
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
session.timeout.ms = 40000
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [server-name:9092]
client.id = 
fetch.max.wait.ms = 500
fetch.min.bytes = 50000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
sasl.kerberos.kinit.cmd = /usr/bin/kinit
auto.offset.reset = latest
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
ssl.endpoint.identification.algorithm = null
max.partition.fetch.bytes = 2097152
ssl.keystore.location = null
ssl.truststore.location = null
ssl.keystore.password = null
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
auto.commit.interval.ms = 5000
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
group.id = test7
enable.auto.commit = false
metric.reporters = []
ssl.truststore.type = JKS
send.buffer.bytes = 131072
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
heartbeat.interval.ms = 3000

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:624) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:518) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:500)

5
Can you post the entire error stack? there are not enough details to help here.Gwen Shapira

5 Answers

26
votes

These conditions needed to be keep in mind to change session.timeout.ms:

  1. group.max.session.timeout.ms in the server.properties > session.timeout.ms in the consumer.properties.
  2. group.min.session.timeout.ms in the server.properties < session.timeout.ms in the consumer.properties.
  3. request.timeout.ms > session.timeout.ms + fetch.wait.max.ms
  4. (session.timeout.ms)/3 > heartbeat.interval.ms
  5. session.timeout.ms > Worst case processing time of Consumer Records per consumer poll(ms).
19
votes

The range of consumer session timeout is controlled by broker group.max.session.timeout.ms(default 30s) and group.min.session.timeout.ms(default 6s).

You should increase group.max.session.timeout.ms first in broker side, otherwise you will get "The session timeout is not within an acceptable range.".

1
votes

i am using spring-kafka

i had added the following config but the consumer still was not up:

buildProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, env.getProperty("kafka.user-events-min-bytes"));
    buildProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, env.getProperty("kafka.user-events-wait-time-ms") );
    buildProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("kafka.user-events-wait-time-ms") );
    buildProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, env.getProperty("kafka.user-events-request-timeout-ms"));
    buildProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, env.getProperty("kafka.user-events-wait-time-ms"));

i figured out it was failing because poll.timeout was 1000; adding the following config helped:

factory.getContainerProperties().setPollTimeout(Integer.parseInt(env.getProperty("kafka.user-events-wait-time-ms")));
0
votes

While the other answers to this question correctly describe the error and how to increase session.timeout.ms, there is a better and more direct way to address the original goal:

allow longer time for processing the messages received between poll() calls

The best way to achieve this in modern Kafka versions is to directly set max.poll.interval.ms in the consumer configuration to a higher value.

Most contemporary client libraries today are based on librdkafka, which has a background thread sending heartbeats. The librdkafka CONFIGURATION documentation describes session.timeout.ms as:

Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance.

Where as max.poll.interval.ms (which defaults to 300000ms, or 5 minutes) is described as:k

Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set enable.auto.offset.store=false for long-time processing applications and then explicitly store offsets (using offsets_store()) after message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information.

Heartbeat support (KIP-62) was added to Kafka in version 0.10.1. The reason this is better than increasing session.timeout.ms is that the broker can distinguish between consumer client entirely disappearing (eg, crashing, network interruptions) and long processing time. In the former case, the broker can rebalance to another consumer faster.

0
votes

How to set max.poll.records in Kafka-Connect API

It was solved. I added below configuration in connect-avro-standalone.properties

group.id=mygroup consumer.max.poll.records=1000