Any solution to this problem??? I am unable to read KAFKA-AVRO Schema messages. Iam trying to send messages from logstash to KAFKA to HDFS.
The following is the tech stack:
- Logstash 2.3 - Current production version
- Confluent 3.0.
- Plugins: a. Logstash-kafka-Output plugin b. Logstash-codec-avro.
- zookeeper: 3.4.6
- KAFKA: 0.10.0.0
Logstash config file looks like this:
input {
stdin{}
}
filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}
output {
kafka {
topic_id => 'logstash_logs14'
codec => avro {
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}
The schema.avsc file looks like this:
{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}
Following commands were run:
Start Zookeeper in its own terminal
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
2 Start Kafka in its own terminal
./bin/kafka-server-start ./etc/kafka/server.properties
3 Start schema registry in its own terminal
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
4 From logstash directory, run the following command
bin/logstash -f ./bin/logstash.conf
5 Type the log message that you wish to send to kafka after running above command ex: "Hello World"
6 Consume the topic from Kafka
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Please let me know how to solve this problem
Thanks, Upendra