We are observing very poor performance with a Java Kafka Producer 0.9 client when sending small messages. The messages are not being accumulated into a larger request batch and thus each small record is being sent separately.
What is wrong with our client configuration? Or is this some other issue?
Using Kafka Client 0.9.0.0. We did not see any related postings in the Kafka unreleased 9.0.1 or 9.1 fixed or unresolved lists, so we are focused on our client configuration and server instance.
We understand the linger.ms should cause the client to accumulate records into a batch.
We set linger.ms to 10 (and also tried 100 and 1000) but these did not result in the batch accumulating records. With a record size of about 100 bytes and a request buffer size of 16K, We would have expected about 160 messages to be sent in a single request.
The trace at the client seems to indicate that the partition may be full, despite having allocated a fresh Bluemix Messaging Hub (Kafka Server 0.9) service instance. The test client is sending multiple messages in a loop with no other I/O.
The log shows a repeating sequence with a suspect line: "Waking up the sender since topic mytopic partition 0 is either full or getting a new batch".
So the newly allocated partition should be essentially empty in our test case, thus why would the producer client be getting a new batch?
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00' 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - Allocating a new 16384 byte message buffer for topic mytopic partition 0 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)] 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)] 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Received produce response from node 0 with correlation id 11 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null. 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Send returned metadata: Topic='mytopic', Partition=0, Offset=130 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00' Log entries repeat like the above for each record sent
We provided the following properties file:
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - Properties retrieved from file for Kafka client: kafka-producer.properties 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - acks=-1 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - client.id=ExploreProducer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.password=changeit 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.type=JKS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - security.protocol=SASL_SSL Plus we added linger.ms=10 in code.
The Kafka Client shows the expanded/merged configuration list (and displaying the linger.ms setting):
2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = [hidden] max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = ExploreProducer ssl.endpoint.identification.algorithm = null ssl.protocol = TLSv1.2 request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2] acks = -1 batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = SASL_SSL retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 10
The Kafka metrics after sending 100 records:
Duration for 100 sends 8787 ms. Sent 7687 bytes. batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.] batch-size-max = 110.0 [The max number of bytes sent per partition per-request.] buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).] buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion] buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).] bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.] byte-rate = 291.8348916277093 [] compression-rate = 0.0 [] compression-rate-avg = 0.0 [The average compression rate of record batches.] connection-close-rate = 0.0 [Connections closed per second in the window.] connection-count = 2.0 [The current number of active connections.] connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.] incoming-byte-rate = 10.342564641029007 [] io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O] io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.] io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.] io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.] metadata-age = 8.096 [The age in seconds of the current producer metadata being used.] network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.] outgoing-byte-rate = 451.2298783403283 [] produce-throttle-time-avg = 0.0 [The average throttle time in ms] produce-throttle-time-max = 0.0 [The maximum throttle time in ms] record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors] record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.] record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.] record-retry-rate = 0.0 [] record-send-rate = 2.65611304417116 [The average number of records sent per second.] record-size-avg = 97.87 [The average record size] record-size-max = 98.0 [The maximum record size] records-per-request-avg = 1.0 [The average number of records per request.] request-latency-avg = 0.0 [The average request latency in ms] request-latency-max = 74.0 [] request-rate = 2.6468892499606897 [The average number of requests sent per second.] request-size-avg = 42.0 [The average size of all requests in the window..] request-size-max = 170.0 [The maximum size of any request sent in the window.] requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.] response-rate = 2.651196976060479 [The average number of responses received per second.] select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second] waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]
Thanks