0
votes

I'm trying to send messages from Kafka Producer on one PC to Kafka broker on another PC using wifi interfaces, but messages don't appear in the specified topic at Kafka broker.

I connected two PCs using ASUS wireless router and disabled all the firewalls on PCs and router. Both PCs ping successfully each other. When I turn to a wired connection, it works and messages are ingested to the specified topic on kafka broker pc.

Kafka Producer:

public class CarDataProducer {
    public static void main(String[] args) {

        CarDataProducer fProducer= new CarDataProducer();
        Producer<String, CarData> producer= fProducer.initializeKafkaProducer();

        String topicName = "IN-DATA";

        CSVReaderCarData csvReader = new CSVReaderCarData();
        List<CarData> CarDataList = csvReader.readCarDataFromCSV("data/mllib/TrainTest_101.csv");

        //read from CSV file and send
        for (CarData val : CarDataList) {
            producer.send(new ProducerRecord<String, CarData>(topicName, val));
        }
    }

    public KafkaProducer<String, CarData> initializeKafkaProducer() {

        // Set the producer configuration properties.
        Properties props = ProducerProperties.getInstance();

        // Instantiate a producerSampleJDBC
        KafkaProducer<String, CarData> producer = new KafkaProducer<String, CarData>(props);

        return producer;
    }

public class ProducerProperties {
    private ProducerProperties() {
    }

    public static final Properties props = new Properties();
    static {
        props.put("bootstrap.servers", "192.168.1.124:9092");
        props.put("acks", "0");
        props.put("retries", 0);
        props.put("batch.size", 500);
        props.put("linger.ms", 500);
        props.put("buffer.memory", 500);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "com.iov.safety.vehicleproducer.CarDataSerializer");
    }

    public static Properties getInstance() {
        return props;
    }
}

Check message reception via Kafka Consumer using console on the server-side:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IN-DATA

The reception of messages should look like this:

 kafka-console-consumer.sh --bootstrap-server 192.168.1.124:9092 --topic IN-DATA
{"instSpeed":19.0,"time":15.0,"label":0.0}
{"instSpeed":64.0,"time":15.0,"label":1.0}
{"instSpeed":10.0,"time":16.0,"label":0.0}

ifconfig on server side


ifconfig on server side

ipconfig on kafka producer side


ipconfig on kafka producer side

  • server.properties:

listeners=PLAINTEXT://:9092

  • netstat -ano|grep '9092'

tcp6 0 0 :::9092 :::*
LISTEN off (0.00/0/0) tcp6 0 0 127.0.0.1:53880
127.0.1.1:9092 ESTABLISHED keepalive (6659.53/0/0) tcp6 0 0 127.0.1.1:9092 127.0.0.1:53880 ESTABLISHED keepalive (6659.53/0/0) tcp6 0 0 127.0.1.1:9092
127.0.0.1:53878 ESTABLISHED keepalive (6659.15/0/0) tcp6 0 0 127.0.0.1:53878 127.0.1.1:9092 ESTABLISHED keepalive (6659.15/0/0)


By adding callback to send of kafka producer, I get timeout error:

org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for IN-DATA-0: 30045 ms has passed since last append

1

1 Answers

0
votes

I solved it. Each Kafka broker has to advertise its hostname/ip, to be reachable from Kafka Producer on another pc.

 kafka-server-start.sh config/server.properties --override  advertised.listeners=PLAINTEXT://192.168.1.124:9092
sha

Instead, we can update config/server.properties as follows:

advertised.listeners=PLAINTEXT://your.host.name:9092 

or

advertised.listeners=PLAINTEXT://192.168.1.124:9092