1
votes

I have updated to release 1.0.1 since 1.0.0 has OOM issue.

I set up cluster which have four brokers.

There are about 150 topics, and about total 4000 partitions, ReplicationFactor is 2.
connctors are used to write/read data to/from brokers.
connecotr version is 0.10.1.
The average message size is 500B, and around 60000 messages per seconds.
one of the broker keep report OOM, and can't handle request like:

    [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[
    {partition=16,fetch_offset=51198,max_bytes=60728640} ,{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} (kafka.server.KafkaApis)
    java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
    at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523)
    at scala.Option.map(Option.scala:146)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513)
    at scala.Option.flatMap(Option.scala:171)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041)
    at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54)
    at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574)
    at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593)
    at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:592)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
    at kafka.server.KafkaApis$$anonfun$handleFetchRequest$4.apply(KafkaApis.scala:609)
    at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
    at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:601)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:99)

and then lots of shrink ISR ( this broker is 1001)

    018-03-24 13:43:00,285] INFO [Partition gnup.source.offset.storage.topic-5 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
018-03-24 13:43:00,286] INFO [Partition s1mme.data-72 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
018-03-24 13:43:00,286] INFO [Partition gnup.sink.status.storage.topic-17 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
018-03-24 13:43:00,287] INFO [Partition probessgsniups.sink.offset.storage.topic-4 broker=1001] Shrinking ISR from 1001,1002 to 1001 (kafka.cluster.Partition)
018-03-24 13:43:01,447] INFO [GroupCoordinator 1001]: Stabilized group connect-VOICE_1_SINK_CONN generation 26 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
i can't dump the heap since each time I run:
[root@sslave1 kafka]# jcmd 55409 GC.heap_dump /home/ngdb/heap_dump175.hprof
55409:
    com.sun.tools.attach.AttachNotSupportedException: Unable to open socket file: target process not responding or HotSpot VM not loaded  
    at sun.tools.attach.LinuxVirtualMachine.(LinuxVirtualMachine.java:106)
    at  sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:63)
    at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:208)
    at sun.tools.jcmd.JCmd.executeCommandForPid(JCmd.java:147)
    at sun.tools.jcmd.JCmd.main(JCmd.java:131)

the JVM parameter is:

    -XX:+ExplicitGCInvokesConcurrent -XX:GCLogFileSize=104857600 -XX:InitialHeapSize=2147483648 -XX:InitiatingHeapOccupancyPercent=35  -XX:+ManagementServer -XX:MaxGCPauseMillis=20 -XX:MaxHeapSize=4294967296   -XX:NumberOfGCLogFiles=10 -XX:+PrintGC -XX:+PrintGCDateStamps   -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+UseCompressedClassPointers   -XX:+UseCompressedOops -XX:+UseG1GC -XX:+UseGCLogFileRotation

when I use -XX:mx=2G, four brokers reported OOM,
after i increated it to 4G, only one brokers reported OOM.
Ticker is also raised in https://issues.apache.org/jira/browse/KAFKA-6709.

2
when i tested it under 0.10.2.2, same throughput, heap size is -XX:mx=1G, -XX:ms=1G, there is no OOM reported. - user1654115

2 Answers

0
votes

Between the 0.10.X and >= 0.11.X Kafka versions, the message format changed.

So when running a recent broker (>= 0.11) with older clients (<=0.10), brokers have to down convert messages before sending them back to clients. This is documented in the upgrades notes: http://kafka.apache.org/documentation/#upgrade_11_message_format.

You can see in the stack trace this is indeed happening:

at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)

This comes with a performance hit and also increase the amount of memory required as brokers need to allocate new buffers to create down converted messages.

You should try upgrading your clients to the same release as the brokers. Also considering how small your current heap is (4GB), increasing it is likely to help.

Another option is to force the newer brokers to use the older message format (using log.message.format.version) but that would prevent you from using some of the newer features.

0
votes

I meet the same problem, and i fixed it by restarting kafka process.