0
votes

I am following the KafkaStreams course from O'Reiley but having issues running KafkaStreams v2.0.0 in Java; my code and properties configuration is as follows:

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    KStreamBuilder builder = new KStreamBuilder();
    // 1 - stream from Kafka

    KStream<String, String> textLines = builder.stream("word-count-input");
    KTable<String, Long> wordCounts = textLines
            // 2 - map values to lowercase
            .mapValues(textLine -> textLine.toLowerCase())
            // can be alternatively written as:
            // .mapValues(String::toLowerCase)
            // 3 - flatmap values split by space
            .flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+")))
            // 4 - select key to apply a key (we discard the old key)
            .selectKey((key, word) -> word)
            // 5 - group by key before aggregation
            .groupByKey()
            // 6 - count occurrences
            .count("Counts");

    // 7 - to in order to write the results back to kafka
    wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

KafkaAdminClient is able to connect to "otherhost:9092" and the application runs without issue, but strangely I get errors about the Consumer client attempting to connect to a non-existent localhost instance of kafka instead of "otherhost:9092":

WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748) 

Notice that the Consumer is attempting to connect to "localhost/127.0.0.1:9092" instead of otherhost:9092, why? As such, my streaming application will not work. I am confused because the Kafka Streams documentation (https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#bootstrap-servers) explicitely states: "The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092". "

What could I be doing wrong? Thanks

1
Can you use kafka-console-consumer and producer to make sure that the configuration of the broker(s) is correct? Just send a single message and see if it gets consumed.Jacek Laskowski

1 Answers

2
votes

The address that is returned to clients is set by the advertised.listeners address on the broker.

The address given in your code, is only the initial bootstrap connection.

You'll need to edit the broker settings to make sure that you return the externally resolvable "advertised" address, then restart it