I just finished my Kafka cluster setup using Confluent Platform 6.0.0 (Apache Kafka version 2.6.0). The Kafka brokers are deployed in Kubernetes. Producing messages to a new topic works fine as long as it's not compressed.
However I just tried to produce a snappy compressed message and got an error returned. Hence I looked at the broker logs and saw the following exception in the broker:
[2020-11-24 13:14:37,834] ERROR (data-plane-kafka-request-handler-0:Logging) [ReplicaManager broker=1] Error processing append operation on partition customers-2
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:92)
at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261)
at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:340)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:401)
at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:394)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:394)
at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
at kafka.log.Log.$anonfun$append$2(Log.scala:1095)
at kafka.log.Log.append(Log.scala:2340)
at kafka.log.Log.appendAsLeader(Log.scala:1019)
at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972)
at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.mutable.HashMap.map(HashMap.scala:34)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605)
at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:145)
at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
... 24 more
Additional info: In Kubernetes I configured that the container has no permissions to write files to the local filesystem. I'm not sure if this is relevant, but maybe this is required for the snappy class to be initialized successfully?
Why does Kafka fail to handle snappy compressed messages?