I am following the KafkaStreams course from O'Reiley but having issues running KafkaStreams v2.0.0 in Java; my code and properties configuration is as follows:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "otherhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
// 1 - stream from Kafka
KStream<String, String> textLines = builder.stream("word-count-input");
KTable<String, Long> wordCounts = textLines
// 2 - map values to lowercase
.mapValues(textLine -> textLine.toLowerCase())
// can be alternatively written as:
// .mapValues(String::toLowerCase)
// 3 - flatmap values split by space
.flatMapValues(textLine -> Arrays.asList(textLine.split("\\W+")))
// 4 - select key to apply a key (we discard the old key)
.selectKey((key, word) -> word)
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurrences
.count("Counts");
// 7 - to in order to write the results back to kafka
wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
KafkaAdminClient is able to connect to "otherhost:9092" and the application runs without issue, but strangely I get errors about the Consumer client attempting to connect to a non-existent localhost instance of kafka instead of "otherhost:9092":
WARN [Consumer clientId=xxxxxxxxx-678dee93-a403-4635-9cfb-ccde35489acc-StreamThread-1-consumer, groupId=xxxxxxxxxx] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:748)
Notice that the Consumer is attempting to connect to "localhost/127.0.0.1:9092" instead of otherhost:9092, why? As such, my streaming application will not work. I am confused because the Kafka Streams documentation (https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#bootstrap-servers) explicitely states: "The Kafka bootstrap servers. This is the same setting that is used by the underlying producer and consumer clients to connect to the Kafka cluster. Example: "kafka-broker1:9092,kafka-broker2:9092". "
What could I be doing wrong? Thanks