2
votes

Working with kafka version: 2.0.1 & kafka-streams-scala version 2.0.1

Log DEBUG messages such as:

DEBUG 2019-05-08 09:57:53,322 [he.kafka.clients.NetworkClient] [
] [ ]: [Consumer clientId=XXX-bd6b071d-a44f-4253-a3a5-539d60a72dd3-StreamThread-1-consumer, groupId=XXX] Disconnecting from node YYY due to request timeout."

led me to increase the request.timeout.ms value:

  private val config: Properties = new Properties
  config.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, "240000")
...
  private val streams: KafkaStreams = new KafkaStreams(topology, config)

However, this sets the new value to 240000ms for both the AdminClientConfig and the ConsumerConfig (The default request.timeout.ms values for AdminClientConfig and the ConsumerConfig are actually different -- 120000ms and 40000ms respectively).

Is there any way to set the Kafka Streams configuration values for either the AdminClientConfig or the ConsumerConfig without overriding the values of both?

1

1 Answers

3
votes

You can prefix any config with consumer. or admin. to apply it to only one client.

There is also main.consumer., restore.consumer. and global.consumer. to distinguish the different consumers further. Using consumer. as prefix, the config is applied to all consumers.

Finally, there is also producer. prefix (just mention it for completeness).

Compare the docs: https://docs.confluent.io/current/streams/developer-guide/config-streams.html#naming