0
votes

I have 3 different system. each system contains one zookeepr and one kafka server. using this i created one cluster.

Properties configuration is given below.

zookeeper1.properties

dataDir=/tmp/zookeeper1
clientPort=2181
maxClientCnxns=0
server.1=x.x.x.x:2888:3888
server.2=x.x.x.x:2888:3888
server.3=x.x.x.x:2888:3888
tickTime=2000
initLimit=5
syncLimit=2

zookeeper2.properties

dataDir=/tmp/zookeeper2
clientPort=2181
maxClientCnxns=0
server.1=x.x.x.x:2888:3888
server.2=x.x.x.x:2888:3888
server.3=x.x.x.x:2888:3888
tickTime=2000
initLimit=5
syncLimit=2

zookeeper3.properties

dataDir=/tmp/zookeeper3
clientPort=2181
maxClientCnxns=0
server.1=x.x.x.x:2888:3888
server.2=x.x.x.x:2888:3888
server.3=x.x.x.x:2888:3888
tickTime=2000
initLimit=5
syncLimit=2

server1.properties

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs1
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
default.replication.factor=3
replica.lag.time.max.ms=30000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

server2.properties

broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs2
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
default.replication.factor=3
replica.lag.time.max.ms=30000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

server3.properties

broker.id=3
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
default.replication.factor=3
replica.lag.time.max.ms=30000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

myid1,myid2,myid3 file is created in respected folder.

using this configuration cluster is working properly. when cluster is working application is started successfully.

But after some time if any reason any one system is shut down abnormally (may be electricity) rebalancing of partition and topic is done properly but application is not comes up again when one node is down.

error occurs from kafka library is given below.

[TRACE] 2020-03-20 15:07:54.970 [main] [COM-192.168.33.221 ][BS_KAFKA_INI ] kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:259) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:255) at kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:113) at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1858) at kafka.zk.KafkaZkClient.apply(KafkaZkClient.scala) at com.panamax.baseserver.kafka.initializer.PanamaxBaseKafkaInitializer.createKafkaTopics(PanamaxBaseKafkaInitializer.java:152) at com.panamax.baseserver.kafka.initializer.PanamaxBaseKafkaInitializer.initialize(PanamaxBaseKafkaInitializer.java:67) at com.panamax.mobifinelite.common.core.factory.CommonKafkaEntityFactory.initialize(CommonKafkaEntityFactory.java:95) at com.panamax.mobifinelite.common.fw.initializer.KafkaEntityFactoryInitializer.initialize(KafkaEntityFactoryInitializer.java:26) at com.panamax.mobifinadapter.adapter.core.intializer.AdapterKafkaEntityFactoryInitializer.initialize(AdapterKafkaEntityFactoryInitializer.java:54) at com.panamax.mobifinelite.common.fw.service.MobifinEliteService.initializeService(MobifinEliteService.java:116) at com.panamax.mobifinadapter.adapter.core.fw.service.MobifinAdapterService.start(MobifinAdapterService.java:61) at com.panamax.baseserver.services.PanamaxBaseService.startService(PanamaxBaseService.java:75) at com.panamax.baseserver.services.PanamaxServiceManager.startService(PanamaxServiceManager.java:155) at com.panamax.baseserver.services.PanamaxServiceManager.startAllValidService(PanamaxServiceManager.java:172) at com.panamax.baseserver.server.PanamaxBaseServer.startAllServices(PanamaxBaseServer.java:153) at com.panamax.baseserver.server.PanamaxServerManager.startServer(PanamaxServerManager.java:93) at com.panamax.mobifinadapter.adapter.web.config.MobifinAdapterMain.main(MobifinAdapterMain.java:52)

i am using kafka_2.12-2.4.1

some time this error is also occurs.

[TRACE] 2020-03-20 15:08:46.445 [main] [COM-192.168.33.221 ][BS_KAFKA_INI ] org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.

2
is this logs from producer or consumer? What is bootstrap.servers in producer and consumer side? Producer /Consumer configs can be helpful. - H.Ç.T
InvalidReplicationFactorException can be related to allow.auto.create.topics parameter which is true by default. Are your producer try to send data to topic which is not exists? - H.Ç.T

2 Answers

1
votes

You're storing data in /tmp for every service

When broker or ZK restart, it'll be wiped and likely fail to rejoin the cluster

Therefore your clients will start failing.

Additionally, don't set default partition size to three. This is basically saying you're guaranteed to always have three healthy brokers at any moment in time, which seems unlikely when that's your whole cluster

Disable auto topic creation and make topics yourself

0
votes

In your config you have:

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
default.replication.factor=3

which forces any message in a topic (with default configuration) require 3 copy and with one server down this is impossible.