2
votes

I am working on C++ Kafka client: librdkafka. The lib is here https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_example.cpp. My program is writing 2000000 messages to the broker. During this process, I restarted the broker. Sometimes, no messages failed to be delivered to broker. Some times about 100,000 messages failed to be delivered to broker. queue.buffering.max.messages=100000. It seems that all the messages in the out queue were lost? The error is RdKafka::Message delivery report: Local: Unknown partition.

I found new problems:(1) sometimes, about 200 messages are sent to broker twice.(2) Sometimes, a message was sent to broker already, but the dr_cb() is called. It told me that this message failed to be delivered to broker. I am trying to figure out whether it is the problem of broker or the client. Anyone has similar problems? In fact, I need reliable transmission and delivery reports between client and broker server. I am considering using C client now. Not sure whether this problem will happens again...

The log of broker is:

[2015-07-21 17:48:33,471] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)

[2015-07-21 17:48:33,717] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

[2015-07-21 17:48:33,718] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 5017; ClientId: rdkafka; Topics: test (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.foreach(Set.scala:86) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.immutable.Set$Set1.scala$collection$SetLike$$super$map(Set.scala:73) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.immutable.Set$Set1.map(Set.scala:73) at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542) at kafka.server.KafkaApis.handle(KafkaApis.scala:62) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:745)

[2015-07-21 17:48:33,743] INFO Registered broker 0 at path /brokers/ids/0 with address cyclops-9803:9092. (kafka.utils.ZkUtils$)

[2015-07-21 17:48:33,759] INFO [Kafka Server 0], started (kafka.server.KafkaServer)

[2015-07-21 17:48:33,803] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

[2015-07-21 17:48:33,858] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,000] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)

[2015-07-21 17:48:34,017] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

My producer configuration is:

Global config

client.id=rdkafka

metadata.broker.list=localhost:9092

message.max.bytes=4000000

receive.message.max.bytes=100000000

metadata.request.timeout.ms=900000

topic.metadata.refresh.interval.ms=-1

topic.metadata.refresh.fast.cnt=10

topic.metadata.refresh.fast.interval.ms=250

topic.metadata.refresh.sparse=false

socket.timeout.ms=300000

socket.send.buffer.bytes=0

socket.receive.buffer.bytes=0

socket.keepalive.enable=false

socket.max.fails=10

broker.address.ttl=300000

broker.address.family=any

statistics.interval.ms=0

error_cb=0x5288a60

stats_cb=0x5288ba0

log_cb=0x54942a0

log_level=6

socket_cb=0x549e6c0

open_cb=0x54acf90

opaque=0x9167898

internal.termination.signal=0

queued.min.messages=100000

queued.max.messages.kbytes=1000000

fetch.wait.max.ms=100

fetch.message.max.bytes=1048576

fetch.min.bytes=1

fetch.error.backoff.ms=500

queue.buffering.max.messages=100000

queue.buffering.max.ms=1000

message.send.max.retries=10

retry.backoff.ms=100

compression.codec=none

batch.num.messages=1000

delivery.report.only.error=true

Topic config

request.required.acks=1

enforce.isr.cnt=0

request.timeout.ms=5000

message.timeout.ms=300000

produce.offset.report=false

auto.commit.enable=true

auto.commit.interval.ms=60000

auto.offset.reset=largest

offset.store.path=.

offset.store.sync.interval.ms=-1

offset.store.method=file

consume.callback.max.messages=0

The consumer output is:

[2015-07-22 20:57:21,052] WARN Fetching topic metadata with correlation id 1 for topics [Set(test)] from broker [id:0,host:cyclops-9803,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-07-22 20:57:21,073] WARN [console-consumer-88480_cyclops-9803-1437598630859-416c8038-leader-finder-thread], Failed to find leader for Set([test,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cyclops-9803,port:9092)] failed

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)

at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)

at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)

at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)

at kafka.producer.SyncProducer.send(SyncProducer.scala:113)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)

Any suggestions are welcome. Thanks.

1
This seems like a broker issue where it might intermittenly fail to respond appropriately to MetadataRequests during startup or failover, thus providing invalid or incomplete information back to the client.Edenhill
Sometimes, when the broker is up after down, all the messages in the out queue failed to be delivered. The error is "unknown partition". It seems that the client has no idea about the current status of the broker for some time. It thinks the partition 0 is unavailable. But in this case, the client should try and try again. because I set the retry times to huge number. If the broker is away, the client should keep trying. But I have no idea whether the client tried as many times as I set.BAE

1 Answers

0
votes

In the asyn mode, the client should handle this kind of problem. No idea how to make sure the messages in the out queue can be delivered to broker with 100% probability. What we can do is to make sure the message in the out queue. If failed to delivery, we should put the message into the queue again. If failed to delivery, dr_cb() is called. In this function, try to put the message into the out queue again. Maybe this is not the best way. But now, I am using this way.