1
votes

I want to produce some generic data into kafka topic using apache nifi and i want this data to be in avro format. What i've done for it:

  1. Create new schema in schema registry:

{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }

  1. Create simple nifi pipeline: enter image description here ConvertAvroSchema settings: enter image description here PublishKafkaRecord settings: enter image description here AvroReader settings: enter image description here AvroRecordSetWriter settings: enter image description here
  2. Then i try to read it using kafka streams:

    public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
    
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");
    
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, GenericRecord> source = builder.stream("topic");
        source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));
    
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, properties);
        streams.start();
    }
    

    }

GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java

And in result i get errors:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I also tried to explicitly set avro schema in avroreader\writer but it didn't help. Also, if i try to simply read bytes from topic and convert it into a string representation, i get something like this:

Objavro.schema{"type":"record","name":"my_schema","namespace":"my_namespace","doc":"","fields":[{"name":"key","type":"int"},{"name":"value","type":["null","int"]},{"name":"event_time","type":"long"}]}avro.codecsnappyÛ4ým[©q ÃàG0 ê¸ä»/}½{Û4ým[©q ÃàG0

How can i fix it?

1
I don't think you need to download that example serde class... There's already one provided by Confluent, and Avro isn't meant to be human readable, so there's nothing to "fix" in your string represented data.OneCricketeer
In any case, if you're using ExecuteSQL, have you looked at Kafka Connect JDBC? Which natively integrates with Avro and Confluent Schema Registry?OneCricketeer
@cricket_007 yes, i've looked at kafka connect, but ExecuteSQL is just for generating some random data for testing. In the end of pipeline i want to use kafka connect to store transformed data into DB.Nikita Ryanov
Nifi can store data as well. As far as random data goes, KSQL project includes a utility called "datagen"OneCricketeer
Hm, i'll look at KSQL datagen. Didn't consider it before for itNikita Ryanov

1 Answers

5
votes

In the PublishKafka processor your Avro writer is configured with "Schema Write Strategy" of "Embedded Avro Schema". This means the messages being written to Kafka are standard Avro messages with the full schema embedded.

On your consumer side (Kafka streams) it looks like it is expecting to use the confluent schema registry, in which case it is not expecting an embedded Avro schema, it is expecting a special sequence of bytes specifying a schema id, followed by the bare Avro message.

Assuming you want to keep your consumer as is, then on the NiFi side you will want to change your Avro writer's "Schema Write Strategy" to "Confluent Schema Registry Reference". I think this might also require you to change the Avro reader to access the schema using a Confluent Schema Registry service.

Alternatively, maybe there is a way to make Kafka Streams read an embedded schema and not use the Confluent schema registry, but I have not used Kafka Streams before so I can't say if that is possible.