0
votes

KafkaProducer is not able to pick schema.registry.url defined in its properties .

As we can see in following screenshot , the schema registry url is a dummy url

// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;

enter image description here

But still in my logs the publishing of messaging using KafkaProducer fails with host as http://0:8081

{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/

These two above mentioned proofs were taken in one single run of the programme . As we can clearly see the schmearegistry url prompting during eclipse debugging is 123.1.1.1 but it is http://0 in case of my failed logs .

Because of this in my other environment i am not able to run other assigned schema.registry.url because it is always using http://0

The code is hosted on one machine and the schema registry / broker is on another .

The registry was started in development environment ./confluent start

My Producer Code :

private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;

public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
    // key is null
    ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
    try {
        Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {

            if (metadata != null) {
                log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
                        + " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
            }
        });
        RecordMetadata recordMetadata = futureHandle.get();

    } catch (Exception e) {
        if (e.getCause() != null)
            log.error("Monitoring - " + e.getCause().toString());
        throw new RuntimeException(e.getMessage(), e.getCause());
    } finally {
        // initializer.getKafkaProducer().close();
    }
}

@PostConstruct
private void initialize() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    // kafkaProps.put("value.serializer",
    // "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
    props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
    props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here 
    props.put("acks", "all");
    kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}

MonitoringConfig : public class MonitoringConfig {

    @Value("${kafka.zookeeper.connect}")
    private String zookeeperConnect;

    @Value("${kafka.broker.list}")
    private String brokerList;

    @Value("${consumer.timeout.ms:1000}")
    private String consumerTimeout;

    @Value("${producer.retry.count:1}")
    private String producerRetryCount;

    @Value("${schema.registry.url}")
    private String schemaRegistryUrl;

    @Value("${consumer.enable:false}")
    private boolean isConsumerEnabled;

    @Value("${consumer.count.thread}")
    private int totalConsumerThread;
}

application.properties :

kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082

The custom avro serializer is something which I need to deprecate and use the way as discussed here but I am sure that is not the cause of this problem .

Here are the details of hosts : HOST 1 : Has this Java service and Kafka Connect and the error logs are coming here . HOST 2: Has Kafka , Schema Registry and Zookeper .

1
Where is everything running? Same machine? Different machines? How did you start the registry? Can you show its config file? Help us reproduce the problem with a minimal reproducible example, including your producer code - OneCricketeer
Sure, but my point is that in the Confluent serializer, for example, it defaults the registry to localhost. Which is why I ask to see yours... If you want to deprecate yours, then now is a good moment to do so docs.confluent.io/current/schema-registry/docs/… - OneCricketeer
Your previous post does not show where schema.registry.url is ever actually taken out of the Properties and used to define the CachedSchemaRegistryClient class... In other words, your properties have the string, sure, that's clearly working, but that's not enough to override the default behavior of the Registry client - OneCricketeer
Again, please edit your question to include the entire Serializer class, don't link to a separate post - OneCricketeer
I personally find it easier to implement a single schema in a topic. JSON payloads are just wasteful for network traffic optimization. If it's metrics, not sure what more you need than an app/metric name, a list of optional tags for later filtering, a timestamp, and the actual metric value - OneCricketeer

1 Answers

2
votes

You're using a custom serializer, and as part of the Serializer implementation, you must define a configure method that accepts a Map.

Within that method, I'm guessing you defined a CachedSchemaRegistryClient field, but did not extract the url property that's added at the Producer level from the config map, and so it'll default to using some other localhost address

The Confluent code requires stepping through four classes, but you'll see the Serializer implementation here, then look at the separate Config class, as well as the Abstract Serializer and SerDe parent classes. From your previous post, I had pointed out that I didn't think you needed to actually use a custom Avro serializer because you seemed to be redoing what the AbstractKafkaSerializer class does