2
votes

I'm using the following code (not really, but let's assume it) to create a schema and send it to kafka by a producer.

public static final String USER_SCHEMA = "{"
        + "\"type\":\"record\","
        + "\"name\":\"myrecord\","
        + "\"fields\":["
        + "  { \"name\":\"str1\", \"type\":\"string\" },"
        + "  { \"name\":\"str2\", \"type\":\"string\" },"
        + "  { \"name\":\"int1\", \"type\":\"int\" }"
        + "]}";

public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);

        Thread.sleep(250);

    }

    producer.close();
}

The thing is the code allows me to send only 1 message with this schema. Then I need to change the schema name in order to send the next message... so the name string is a randomly generated one right now so I can send more message. This is a hack so I'd like to know the proper way to do this.

I've also looked at how to send messages without a schema (ie already sent 1 message with schema to kafka now all other messages don't need schema anymore) - but new GenericData.Record(..) expects a schema parameter. If it's null it will throw an error.

So what's the correct way to send avro schema messages to kafka?

Here is another code sample - pretty identical to mine:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java

It also doesn't show how to send without setting a schema.

1

1 Answers

1
votes

I didn't understand the line:

The thing is the code allows me to send only 1 message with this schema. Then I need to change the schema name in order to send the next message.

In both of the examples, your and the confluent example you supplied, the schema is not sent to Kafka.

In the example you supplied, the schema used to create a GenericRecord object. You supply the schema, because you want to validate the record against some schema (for example validate that you would only be able to put an integer int1 field inside GenericRecord object).

In your code the only difference is that you decided to serialize the data to byte[], which is probably not needed since you can delegate this responsibility to KafkaAvroSerializer, as you can see in the confluent example.

GenericRecord is an Avro object, it is not an enforcement by Kafka. If you want to send any kind of object to Kafka (with schema or without), you just need to create (or use exiting) serializer that will convert your object to byte[] and set this serializer in the properties you are creating for the producer.

Usually it is a good practice to send a pointer to the schema with the Avro message itself. You can find the reasoning for it at the following links: http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/