KafkaProducer is not able to pick schema.registry.url defined in its properties .
As we can see in following screenshot , the schema registry url is a dummy url
// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;
But still in my logs the publishing of messaging using KafkaProducer fails with host as http://0:8081
{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/
These two above mentioned proofs were taken in one single run of the programme . As we can clearly see the schmearegistry url prompting during eclipse debugging is 123.1.1.1 but it is http://0 in case of my failed logs .
Because of this in my other environment i am not able to run other assigned schema.registry.url because it is always using http://0
The code is hosted on one machine and the schema registry / broker is on another .
The registry was started in development environment ./confluent start
My Producer Code :
private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;
public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
// key is null
ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
try {
Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {
if (metadata != null) {
log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
+ " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
}
});
RecordMetadata recordMetadata = futureHandle.get();
} catch (Exception e) {
if (e.getCause() != null)
log.error("Monitoring - " + e.getCause().toString());
throw new RuntimeException(e.getMessage(), e.getCause());
} finally {
// initializer.getKafkaProducer().close();
}
}
@PostConstruct
private void initialize() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// kafkaProps.put("value.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here
props.put("acks", "all");
kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}
MonitoringConfig : public class MonitoringConfig {
@Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;
@Value("${kafka.broker.list}")
private String brokerList;
@Value("${consumer.timeout.ms:1000}")
private String consumerTimeout;
@Value("${producer.retry.count:1}")
private String producerRetryCount;
@Value("${schema.registry.url}")
private String schemaRegistryUrl;
@Value("${consumer.enable:false}")
private boolean isConsumerEnabled;
@Value("${consumer.count.thread}")
private int totalConsumerThread;
}
application.properties :
kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082
The custom avro serializer is something which I need to deprecate and use the way as discussed here but I am sure that is not the cause of this problem .
Here are the details of hosts : HOST 1 : Has this Java service and Kafka Connect and the error logs are coming here . HOST 2: Has Kafka , Schema Registry and Zookeper .

schema.registry.urlis ever actually taken out of the Properties and used to define theCachedSchemaRegistryClientclass... In other words, your properties have the string, sure, that's clearly working, but that's not enough to override the default behavior of the Registry client - OneCricketeer