1
votes

I have created a topic in MSK(Kafka). And also i have registered avro schema . Now i am trying to produce a message to the topic ,But when i run my producer i get below error

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic AVRO-AUDIT_EVENT not present in metadata after 60000 ms.

Here is my java code to produce avro message

String topicName = "AVRO-AUDIT_EVENT";

        Properties props = new Properties();
        props.put("bootstrap.servers",
                "b-3.*****:9092,b-4.****:9092,b-5.****:9092");
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", "http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/");

        JSONObject job = new JSONObject(json);
        String bodyofJson = job.getString("body");
        JSONObject bodyJsonObj = new JSONObject(bodyofJson);
        System.out.println(bodyJsonObj.get("ID"));

        Producer<String, String> producer = new KafkaProducer<>(props);
        try {

            producer.send(new ProducerRecord<String, String>(topicName, bodyJsonObj.get("ID").toString(), bodyofJson))
                    .get();

            System.out.println("Complete");
        } catch (Exception ex) {
            ex.printStackTrace(System.out);
        } finally {
            producer.close();
        }

I can list the topic and see the topic name as well and also i can read the topic message .But when i runt this i get this error .

Regards

2

2 Answers

2
votes

I assume you are not running Kafka locally...

Remove this line

props.put("bootstrap.servers", "localhost:9092,localhost:9093");

Maybe deploy schema registry in AWS as well, and change this line (as well as the address)

props.put("schema.registry.url", "http://localhost:8081/");

Also, if you want to send Avro, create a GenericRecord Avro object, not a JSON String. Otherwise, your schema is just "string"

1
votes

In my case over the weekend it lost the connection to all the nodes. It seems that it doesn't recover the connection automatically.

After restarting the service, it worked just fine.