4
votes

Any solution to this problem??? I am unable to read KAFKA-AVRO Schema messages. Iam trying to send messages from logstash to KAFKA to HDFS.

The following is the tech stack:

  1. Logstash 2.3 - Current production version
  2. Confluent 3.0.
  3. Plugins: a. Logstash-kafka-Output plugin b. Logstash-codec-avro.
  4. zookeeper: 3.4.6
  5. KAFKA: 0.10.0.0

Logstash config file looks like this:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
  }
}

output {
  kafka {
topic_id => 'logstash_logs14'

codec => avro  { 
schema_uri => "/opt/logstash/bin/schema.avsc"
    }
  }
}

The schema.avsc file looks like this:

{
    "type":"record",
    "name":"myrecord",
    "fields":[
        {"name":"message","type":"string"},
        {"name":"host","type":"string"}
        ]
}

Following commands were run:

  1. Start Zookeeper in its own terminal

    ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2 Start Kafka in its own terminal

./bin/kafka-server-start ./etc/kafka/server.properties

3 Start schema registry in its own terminal

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4 From logstash directory, run the following command

bin/logstash -f ./bin/logstash.conf

5 Type the log message that you wish to send to kafka after running above command ex: "Hello World"

6 Consume the topic from Kafka

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Please let me know how to solve this problem

Thanks, Upendra

2

2 Answers

1
votes

How are you writing/publishing to Kafka? You are seeing the SerializationException because the data was not written using schema-registry (or KafkaAvroSerializer) but while consuming you are using schema-registry, kafka-avro-console-consumer internally uses schema-registry (or KafkaAvroDeserializer) which expects data to be in certain format (specifically <magic byte><schemaId><data>). If you use kafka-avro-console-producer to write avro data then you shouldn't get this exception or you can set KafkaAvroSerializer in your producer properties for key & value serializers and also set schema-registry-url.

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
0
votes

Maybe answering too late, but facing same problem right now.

Logstrash is using default Serializers here, "org.apache.kafka.common.serialization.StringSerializer"

So if you want to read Avro messages from your event bus, you have to Serialize it with KafkaAvroSerializers on logstash output "io.confluent.kafka.serializers.KafkaAvroSerializer"

then from the consumer part use the matching Deserializer. Problem is logstash does not recognize IO.CONFLUENT at ALL, so you have to do some tricky stuff to add it to, as deps and jars