3
votes

I am trying to send messages to Kafka which is installed in a Ubuntu VM.

There are 3 Kafka brokers which have been started in the VM and a Consumer listening on the Topic also in the VM. All this works fine.

In Intellij on Windows 7 I have written a small demo app for a KafkaProducer,

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerApp {

    public static void main(String [] args){

        // Create a properties dictionary for the required/optional Producer config settings:
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9091,localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //--> props.put("config.setting", "value");
        //:: http://kafka.apache.org/documentation.html#producerconfigs

        System.out.println("program start");

        KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(props);

        try{
            for (int i = 0; i < 150; i++){
                myProducer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), "MyMessage: " + Integer.toString(i)));
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            System.out.println("program close");
            myProducer.close();
        }

        System.out.println("program end");

    }

}

When I try running the code though it doesnt connect to the Message Brokers - instead seems to go into an infinite loop and only stops when I click on Stop. Below is the output from the Intellij Terminal window. I get the

Connection Refused

exception below. Anyone have any ideas how I can solve this ?

Is there anyway I can check the connection from Windows to the Ubuntu VM port number shown ? I am wondering if there is potentially a firewall issue preventing the connection.

"C:\Program Files\Java\jdk1.8.0_131\bin\java" -Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.3\lib\idea_rt.jar=62649:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_131\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_131\jre\lib\rt.jar;C:\Dev\kafka-poc\target\classes;C:\Users\rstannard\.m2\repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;C:\Users\rstannard\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\rstannard\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\rstannard\.m2\repository\org\slf4j\slf4j-api\1.7.21\slf4j-api-1.7.21.jar;C:\Users\rstannard\.m2\repository\org\slf4j\slf4j-simple\1.7.21\slf4j-simple-1.7.21.jar" com.riskcare.kafkapoc.KafkaProducerApp
program start
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [localhost:9091, localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
[main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -2 rack: null), localhost:9091 (id: -1 rack: null)], partitions = [])
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
[main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
[main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9091.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
    at java.lang.Thread.run(Thread.java:748)
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Node -1 disconnected.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -2 for sending metadata request
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at localhost:9092.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-sent
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.bytes-received
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--2.latency
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
    at java.lang.Thread.run(Thread.java:748)
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Node -2 disconnected.
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9091.

Process finished with exit code 1

Below is some output from Ubuntu

enter image description here


Update to answer from ppatierno (I cant add images to comments so adding an update here instead)

Thank you for your comment. Below is an image from,
1/ my Windows Terminal using Telnet to try and connect to my VM session
2/ a screenshot from my Ubuntu VM session showing the ports open on Localhost 127.0.0.1.
3/ ipconfig showing my ip address from my Host
4/ ifconfig showing ip address from my Ubuntu VM (Guest).

Am I missing anything ?

enter image description here

enter image description here

UPDATE

I've managed to get the IP and Port recognised, the below link helped me to configure my VM,

https://www.howtogeek.com/122641/how-to-forward-ports-to-a-virtual-machine-and-use-it-as-a-server/

but now I am running into another issue when I try to run my Java Producer application. The logs seem to suggest that its able to recognise the 3-Brokers but for some reason the program is unable to connect with a Broker.

[kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 192.168.56.101:9091. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1. Fetching API versions. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node -1: (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=my-topic) to node -1 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = QZEI79XCTmC8qJ48K1JwLw, nodes = [C0287.VM.xxxxxxxx.com:9092 (id: 2 rack: null), C0287.VM.xxxxxxxx.com:9090 (id: 0 rack: null), C0287.VM.xxxxxxxx.com:9091 (id: 1 rack: null)], partitions = [Partition(topic = my-topic, partition = 0, leader = 1, replicas = [0,1,2], isr = [1,2,0]), Partition(topic = my-topic, partition = 1, leader = 2, replicas = [0,1,2], isr = [2,0,1]), Partition(topic = my-topic, partition = 2, leader = 0, replicas = [0,1,2], isr = [0,1,2])]) [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at C0287.VM.xxxxxxxx.com:9090. program close [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at C0287.VM.xxxxxxxx.com:9090: java.io.IOException: Can't resolve address: C0287.VM.XXX.com:9090 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126) at java.lang.Thread.run(Thread.java:748) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 5 more [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1 at C0287.VM.xxxxxxxx.com:9091. [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 1 at C0287.VM.xxxxxxxx.com:9091: java.io.IOException: Can't resolve address: C0287.VM.xxxxxxxx.com:9091

5

5 Answers

3
votes

Try providing dns mapping in your host file on the host machine (windows host name file). You can find it under System32/etc folder.

That may help.

3
votes

Make sure all three of your kafka brokers are configured with advertised listeners on an IP address and port that is reachable from your app running outside the VM.

See advertised.listeners broker parameter in the documentation here

https://kafka.apache.org/documentation/#brokerconfigs

1
votes

From what you are saying it seems to me that the producer is running on the host and not inside the VM (as the consumer). You are trying to connect from the host using :

localhost:9091,localhost:9092

you should use the VM IP address or allowing the VM to share the host network.

1
votes

I had the same problem with Ubuntu 16.04 and Docker. For me worked solution from Sudhesh Rajan. In /etc/hosts I changed DNS mapping. Just make sure that you are not trying to map ports.

0
votes

change your conf/server.properties file as follow :

listeners=PLAINTEXT://192.168.1.120:9092

or

listeners=PLAINTEXT://hostname:9092