1
votes

I am new to Kafka and trying to install and run console consumer but getting error java.lang.IllegalStateException: There are no in-flight requests for node -1

Environment I tried on is as below

Kafka Version kafka_2.13-2.6.0
MacOS java11 Fails
MacOS java 1.8 Fails
Windows 10 Java11 Success

Below are the steps in details which I am performing. The same steps works on Windows.

STEP 1 Download KAFKA

I just downloaded the Kafka from https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz

STEP 2 Start zookeeper service, which works fine.

I am starting zookeeper with below command

bin % zookeeper-server-start.sh ../config/zookeeper.properties

here is the zookeeper log

[2020-08-30 14:28:52,234] INFO Server environment:java.io.tmpdir=/var/folders/q7/khp8p9k14rzfs_zl52m57hlr0000gn/T/ (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.name=Mac OS X (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.arch=x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.version=10.15.6 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:user.name=jigarnaik (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:user.home=/Users/jigarnaik (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:user.dir=/Users/jigarnaik/Documents/kafka_2.13-2.6.0/bin (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.memory.free=496MB (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.memory.max=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,234] INFO Server environment:os.memory.total=512MB (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,235] INFO minSessionTimeout set to 6000 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,235] INFO maxSessionTimeout set to 60000 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,236] INFO Created server with tickTime 3000 minSessionTimeout 6000 maxSessionTimeout 60000 datadir /tmp/zookeeper/version-2 snapdir /tmp/zookeeper/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
[2020-08-30 14:28:52,243] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
[2020-08-30 14:28:52,247] INFO Configuring NIO connection handler with 10s sessionless connection timeout, 2 selector thread(s), 32 worker threads, and 64 kB direct buffers. (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-08-30 14:28:52,254] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2020-08-30 14:28:52,268] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2020-08-30 14:28:52,271] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-08-30 14:28:52,273] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2020-08-30 14:28:52,286] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
load: 1.58  cmd: java 17764 waiting 0.68u 0.11s

STEP 3 start Kafka - the logs look fine to me.

After that I am starting Kafka using below command

bin % kafka-server-start.sh ../config/server.properties

Kafka startup looks fine with below log tail

[2020-08-30 14:33:41,708] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-08-30 14:33:41,709] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-08-30 14:33:41,709] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-08-30 14:33:41,728] INFO [ExpirationReaper-0-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-08-30 14:33:41,757] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2020-08-30 14:33:41,770] INFO [SocketServer brokerId=0] Starting socket server acceptors and processors (kafka.network.SocketServer)
[2020-08-30 14:33:41,773] INFO [SocketServer brokerId=0] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2020-08-30 14:33:41,773] INFO [SocketServer brokerId=0] Started socket server acceptors and processors (kafka.network.SocketServer)
[2020-08-30 14:33:41,775] INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-08-30 14:33:41,775] INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser)
[2020-08-30 14:33:41,775] INFO Kafka startTimeMs: 1598769221773 (org.apache.kafka.common.utils.AppInfoParser)
[2020-08-30 14:33:41,776] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2020-08-30 14:33:41,808] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(word-count-output-0, word-count-input-0) (kafka.server.ReplicaFetcherManager)
[2020-08-30 14:33:41,815] INFO [Partition word-count-output-0 broker=0] Log loaded for partition word-count-output-0 with initial high watermark 0 (kafka.cluster.Partition)
[2020-08-30 14:33:41,821] INFO [Partition word-count-input-0 broker=0] Log loaded for partition word-count-input-0 with initial high watermark 0 (kafka.cluster.Partition)

STEP 4 Create topic, topic created successfully. after which I. am creating topic

bin % kafka-topics.sh \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic my-topic
Created topic my-topic.

list topic

bin % kafka-topics.sh \
    --list \
    --zookeeper localhost:2181
my-topic
word-count-input
word-count-output

STEP 5 Run console producer, fails with below error.

but when I try to start the console producer using below command, I am getting exception java.lang.IllegalStateException: There are no in-flight requests for node -1 The same step when I run on windows it works fine, I am getting issue only on Mac.

bin % kafka-console-producer.sh \
    --topic word-count-input \
    --bootstrap-server localhost:9092
>[2020-08-30 14:35:17,902] ERROR [Producer clientId=console-producer] Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:509)
    at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:364)
    at org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43)
    at org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102)
    at org.apache.kafka.common.message.ResponseHeaderData.<init>(ResponseHeaderData.java:70)
    at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66)
    at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
    at java.lang.Thread.run(Thread.java:748)
[2020-08-30 14:35:17,904] ERROR [Producer clientId=console-producer] Uncaught error in kafka producer I/O thread:  (org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: There are no in-flight requests for node -1
    at org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62)
    at org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70)
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
    at java.lang.Thread.run(Thread.java:748)
[2020-08-30 14:35:17,938] ERROR Uncaught exception in thread 'kafka-producer-network-thread | console-producer': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
    at java.lang.Thread.run(Thread.java:748)
3
It works for Kafka version 2.0.0Jigar Naik

3 Answers

3
votes

Honestly, I don't know how Firewall/Antivirus played some role but somehow its linked to this problem.

In my case, Consumer was able to consume messages only once but it throws exception BufferUnderflowException when I started the consumer again.

Turning-Off Firewall/Antivirus was't option for me so I added/enabled advertised.listeners=PLAINTEXT://localhost:9092 configuration in server.properties of kafka-server and afterwards it works as expected.

kafka version: 2.6.0

1
votes

Finally I found the issue, the port was being used by sonarqube running on my local but surprisingly when I started kafka there was no error reported.

1
votes

My norton antivirus firewall settings was causing this issue. I disabled firewall settings and everything started working fine.