Thanks for your patience.
- After adding partitions to a topic online, the kafka consumer stops reading message and no exceptions are thrown. The consumer just blocks. Each time we have to restart the consumer. I think it is unreasonable and I cannot find any docs about it.
Moreover, consumer thread will not resume when error occurs in processing message. Our consumer reads message and insert it to MySql. Once the network failed, consumer was not able to connect to MySql, then it blocked and stopped reading message until we restarted it.
- What will happen to old data and new data when adding partition? the docs(https://kafka.apache.org/documentation.html#basic_ops_modify_topic) says:
"Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way."
What does "not attempt to automatically redistribute data" mean? The old data are unchanged and the new data will not be sent to the added partition?
- The kafka producer fail to send message when a broker is down.
We have a topic with 3 partition and 2 replicas. The kafka cluster has 3 brokers. But when a broker is down, exceptions occur:
kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]
kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]
The same problems also happen when new brokers are added. We have to add new brokers hostname and port to "metadata.broker.list" configuration in producer and restart it.
We are using the high level api and the version of kafka is:
<dependency>
<groupId> org.apache.kafka</groupId >
<artifactId> kafka_2.9.2</artifactId >
<version> 0.8.2.0</version >
</dependency>
producer configuration:
<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />
consumer configuration:
<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />
producer code and consumer code are like: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example