4
votes

Thanks for your patience.

  1. After adding partitions to a topic online, the kafka consumer stops reading message and no exceptions are thrown. The consumer just blocks. Each time we have to restart the consumer. I think it is unreasonable and I cannot find any docs about it.

Moreover, consumer thread will not resume when error occurs in processing message. Our consumer reads message and insert it to MySql. Once the network failed, consumer was not able to connect to MySql, then it blocked and stopped reading message until we restarted it.

  1. What will happen to old data and new data when adding partition? the docs(https://kafka.apache.org/documentation.html#basic_ops_modify_topic) says:

"Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way."

What does "not attempt to automatically redistribute data" mean? The old data are unchanged and the new data will not be sent to the added partition?

  1. The kafka producer fail to send message when a broker is down.

We have a topic with 3 partition and 2 replicas. The kafka cluster has 3 brokers. But when a broker is down, exceptions occur:

kafka.producer.async.ProducerSendThread.error():103: - Error in handling batch of 65 events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
  at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.2.0.jar:na]
 at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) [kafka_2.9.2-0.8.2.0.jar:na]
     at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) [kafka_2.9.2-0.8.2.0.jar:na]
       at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) [kafka_2.9.2-0.8.2.0.jar:na]
 at scala.collection.immutable.Stream.foreach(Stream.scala:526) [scala-library-2.9.2.jar:na]
   at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) [kafka_2.9.2-0.8.2.0.jar:na]
      at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [kafka_2.9.2-0.8.2.0.jar:na]

kafka.producer.async.DefaultEventHandler.error():97: - Failed to send requests for topics risk_acts with correlation ids in [433266,433395]

The same problems also happen when new brokers are added. We have to add new brokers hostname and port to "metadata.broker.list" configuration in producer and restart it.

We are using the high level api and the version of kafka is:

<dependency>
      <groupId> org.apache.kafka</groupId >
      <artifactId> kafka_2.9.2</artifactId >
      <version> 0.8.2.0</version >
</dependency>

producer configuration:

<entry key="metadata.broker.list" value="${metadata.broker.list}" />
<entry key="serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="key.serializer.class" value="kafka.serializer.StringEncoder" />
<entry key="request.required.acks" value="-1" />
<entry key="producer.type" value="async" />
<entry key="queue.enqueue.timeout.ms" value="-1" />
<entry key="compression.codec" value="1" />

consumer configuration:

<entry key="zookeeper.connect" value="${zookeeper.connect}" />
<entry key="group.id" value="${kafka.consumer.group.id}" />
<entry key="zookeeper.session.timeout.ms" value="40000" />
<entry key="rebalance.backoff.ms" value="10000" />
<entry key="zookeeper.sync.time.ms" value="2000" />
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="auto.offset.reset" value="smallest" />

producer code and consumer code are like: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

2

2 Answers

1
votes

As for #2, say your key is a Long. Say you have 10 partitions. One way to distribute the Long amongst the partitions is to simply do a modulus operation key % num_partitions. But now think about what happens when you add partitions. The already-written messages will be in the wrong partition based on the current value of num_partitions. What this is saying is that Kafka doesn't re-partition anything for you automatically.

1
votes

First I need you to understand the difference b/w adding the partitions vs repartitioning.

In case of repartitioning : The existing data will be moved from one partition to another

While adding extra partitions : Old data will stay the same and the new data will be distributed b/w all the partitions.

In both the cases, Group co-ordinator will send the signal to all the consumers with the new list of partitions and then consumers will rebalance and finally connect to all the petitions.

In your case you might be facing some other issue which is not related to increasing partitions.

May be enable the debug logs on the servers you will see additional details