I want to produce some generic data into kafka topic using apache nifi and i want this data to be in avro format. What i've done for it:
- Create new schema in schema registry:
{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }
- Create simple nifi pipeline: ConvertAvroSchema settings: PublishKafkaRecord settings: AvroReader settings: AvroRecordSetWriter settings:
Then i try to read it using kafka streams:
public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);
public static void main(String[] args) { Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, GenericRecord> source = builder.stream("topic"); source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString()))); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, properties); streams.start(); }
}
GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java
And in result i get errors:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I also tried to explicitly set avro schema in avroreader\writer but it didn't help. Also, if i try to simply read bytes from topic and convert it into a string representation, i get something like this:
Objavro.schema{"type":"record","name":"my_schema","namespace":"my_namespace","doc":"","fields":[{"name":"key","type":"int"},{"name":"value","type":["null","int"]},{"name":"event_time","type":"long"}]}avro.codecsnappyÛ4ým[©q ÃàG0 ê¸ä»/}½{Û4ým[©q ÃàG0
How can i fix it?