I am trying to send an object to Kafka with Avro serializer and schema registry.
Here is a simplified code:
Properties props = new Properties();
...
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(SCHEMA_REGISTRY_URL_CONFIG, "http://" + schemaRegistryHostname + ":8081");
Producer<String, User> producer = new KafkaProducer(properties);
User user = new User("name", "address", 123);
ProducerRecord record = new ProducerRecord<>(topic, key, user);
producer.send(record);
I assumed that the schema is read "behind the scenes" from the registry and the object (user) is serialized, but I get the error below.
What am I missing?
Do I have to read the schema explicitly and send a GenericRecord?
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:123) ~[kafka-avro-serializer-3.3.0.jar!/:?]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:73) ~[kafka-avro-serializer-3.3.0.jar!/:?]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-3.3.0.jar!/:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:424) ~[kafka-clients-0.9.0.1.jar!/:?]