0
votes

I have a 3-node Zookeeper cluster version 3.4.11 and 2-node Kafka cluster version 0.11.3. We wrote a producer that send messages to the specific topic and partitions of the Kafka cluster (I did it before and the producer is tested). Here are the Brokers configs:

broker.id=1
listeners=PLAINTEXT://node1:9092
num.partitions=24
delete.topic.enable=true
default.replication.factor=2
log.dirs=/data
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
log.retention.hours=168
zookeeper.session.timeout.ms=40000
zookeeper.connection.timeout.ms=10000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2

In the beginning, there is no topic on the brokers and they will be created automatically. When I start the producer, Kafka cluster shows a strange behavior:

1- It creates all topics but while the rate of producing data is 10KB per second, in less than one minutes the log directory of each broker goes from zero data to 9.0 Gigabyte data! and all brokers turned off (because of lack of log-dir capacity)

2- Just when producing data started I try to consume data using the console-consumer and it just errors

WARN Error while fetching metadata with correlation id 2 : {Topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

3- Here is the error which repeatedly appears in brokers log:

INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: Topic6-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
WARN Newly rolled segment file 00000000000000000000.log already exists; deleting it first (kafka.log.Log)
WARN Newly rolled segment file 00000000000000000000.index already exists; deleting it first (kafka.log.Log)
WARN Newly rolled segment file 00000000000000000000.timeindex already exists; deleting it first (kafka.log.Log)
ERROR [Replica Manager on Broker 1]: Error processing append operation on partition Topic6-6 (kafka.server.ReplicaManager)
kafka.common.KafkaException: Trying to roll a new log segment for topic partition Topic6-6 with start offset 0 while it already exists.

After many repeatation of the above log we have:

ERROR [ReplicaManager broker=1] Error processing append operation on partition Topic24-10 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.InvalidOffsetException: Attempt to append an offset (402) to position 5 no larger than the last offset appended (402)

And at the last (when there is no space in log-dir) it errors:

FATAL [Replica Manager on Broker 1]: Error writing to highwatermark file:  (kafka.server.ReplicaManager)
java.io.FileNotFoundException: /data/replication-offset-checkpoint.tmp (No space left on device)

and shutdown!

4- I set up new single node Kafka version 0.11.3 one another machine and it works well using the same producer and using the same Zookeeper cluster.

5- I turned one of the two Kafka brokers off and just using one broker (of the cluster) it behaves the same as when I used two node Kafka cluster.

What is the problem?

UPDATE1: I tried Kafka version 2.1.0 but the same result!


UPDATE2: I found out that root of the problem. In producing I create 25 topics each of which has 24 partitions. Surprisingly each topic just after creating (using kafka-topic.sh command and when no data is stored) occupy 481MB space! For example in the log directory of topic "20" for each partition directory I have the following files that totally has 21MB:

00000000000000000000.index (10MB)  00000000000000000000.log(0MB)  00000000000000000000.timeindex(10MB)  leader-epoch-checkpoint(4KB)

and Kafka writes the following lines for each topic-partitions in the server.log file:

[2019-02-05 10:10:54,957] INFO [Log partition=topic20-14, dir=/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2019-02-05 10:10:54,957] INFO [Log partition=topic20-14, dir=/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
[2019-02-05 10:10:54,958] INFO Created log for partition topic20-14 in /data with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2019-02-05 10:10:54,958] INFO [Partition topic20-14 broker=0] No checkpointed highwatermark is found for partition topic20-14 (kafka.cluster.Partition)
[2019-02-05 10:10:54,958] INFO Replica loaded for partition topic20-14 with initial high watermark 0 (kafka.cluster.Replica)
[2019-02-05 10:10:54,958] INFO [Partition topic20-14 broker=0] topic20-14 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

There is no error on the server log. I even can consume data if I produce data on the topic. As the total log directory space is 10GB, Kafka needs 12025MB for 25 topics in my scenario, which is greater than total directory space and Kafka will error and shutdown!

Just for a test I set up another Kafka broker (namely broker2) using the same Zookeeper cluster and creating a new topic with 24 partitions there just occupy 100K for all the empty partitions!

So I'm really confused! The Broker1 and Broker2, the same version of Kafka (0.11.3) is running and just the OS and System File is different:

In case Broker1 (occupy 481MB data for the new topic):

  • OS CentOS 7 and XFS as System File

In case Broker2 (occupy 100KB data for a new topic):

  • OS Ubuntu 16.04 and ext4 as System File
1
How much data are you producing? Do you have in mind that you have set a replica level of 2 (so all broker will own all data on a two nodes cluster)? Can you show us the producer code?TobiSH
@TobiSH The rate of producing is 10KB per second and I know when the replication factor is 2, each node will have full data.Soheil Pourbafrani
Did you checked the data in the log-dir? What does it contains? duplicates of the data you send to the broker?TobiSH
Maybe just a typo? The two brokers have different ids, right? Should be broker.id=1 and broker.id=2TobiSH
The ids of brokers are different and in the log directory there is a directory for each topic-partition, for example, Topic6-18Soheil Pourbafrani

1 Answers

4
votes
  • Why Kafka preallocate 21MB for each partition?

It's normal behavior and the preallocated size of indices are controlling using the server property: segment.index.bytes and the default value is 10485760 bytes or 10MB. That's because indices in each partition directory allocate 10MB:

00000000000000000000.index (10MB)  
00000000000000000000.log(0MB)  
00000000000000000000.timeindex(10MB)  
leader-epoch-checkpoint(4KB)

On the other hand, the Kafka document says about that property:

We preallocate this index file and shrink it only after log rolls.

But in my case, it never shrank the indices. After a lot of searches I found out Java 8 in some versions (192 in my case) has a bug in working with many small files and it's fixed in update 202. So I updated my Java version to 202 and it solved the problem.