1
votes

I'm learning Kafka and have made the leap to using Maven.

I have a standalone Kafka instance in AWS and a Maven application on my laptop. I've written a small application which acts as a producer

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        // create producer properties
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<IP_TO_REMOTE_SERVER>:9092");

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //create producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

       //producer record
        ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "jello there");
        System.out.println("SENDING RECORD");
        //send data - async
        producer.send(record);

        producer.flush();

        producer.close();
        System.out.println("complete");
    }
}

When I run this, it appears as though I can't connect to the remote instance. I get the error below.

[kafka-producer-network-thread |> producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 0 (/xx.xx.xx.xx:9092) could not be established. Broker may not be available.

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

After looking at Stackoverflow, I updated the server.properties listeners section to be the private IP of the server

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://10.0.1.51:9092

How should I configure Kafka on the server to be accessible and listening remotely?

2
Why don't you use the public IP address in the "listeners" section? The IP must be reachable so you can connect to the brokers.Guido
I couldn't set the 'listeners' to the public IP as EC2 VPC instances are only aware of their private IP - not the public as it's NAT'd by AWS. However, you got me thinking... this blog post rmoff.net/2018/08/02/kafka-listeners-explained mentions using advertised addresses - setting THAT to public did the trick.the_good_pony

2 Answers

1
votes

I suppose the main problem you are facing is from the configuration standpoint. Please check if you have made all the necessary changes before communicating through producer. You need to make following changes:

Kafka change: You need to add configuration in Zookeeper.properties for relevant brokers.

AWS change: While connecting to AWS you need to set up way to pass .pem file. You might need to enable direct access in AWS instance. By default it will block all the unknown traffic.

Other approach: I would recommend creating a Certificate and Key file which will whitelist your own PC as valid source. Add that cert to keystore and truststore on AWS server instance. Change the server.properties listeners = SSL://your.host.name:9092 & your BOOTSTRAP_SERVERS_CONFIG

1
votes

Seeing the responses got me thinking about to change my config to make this work. I found a really good blog article addressing this issue here.

My set up

I would stress this is not production.

A singular AWS EC2 instance in a VPC in a public subnet. Kafka installed. I am connecting to Kafka as a producer from my laptop remotely using Maven.

No changes to zookeeper.properties

Updated server.properties , specifically the listeners and advertised.listeners

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://<PRIVATE_IP_ADDRESS>:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://<PUBLIC_IP_ADDRESS>:9092

Then in my Maven code, for the BOOTSTRAP_SERVERS_CONFIG I reference the public IP

import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) {
        // create producer properties
        Properties properties = new Properties();

        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<PUBLIC_IP_ADRESS>:9092");

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //create producer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

       //producer record
        ProducerRecord <String,String> record = new ProducerRecord<String, String>("first_topic", "good pony");
        System.out.println("SENDING RECORD");
        //send data - async
        producer.send(record);

        producer.flush();

        producer.close();
        System.out.println("complete");
    }
}

This runs successfully

SENDING RECORD

[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1]

[main] INFOorg.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

complete

We see the text pushed to the consumer

enter image description here