3
votes

I created the simplest kafka sink connector config and I'm using confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

and in the topic I save the messages in JSON

{ "topics": "resd"}

But in the result I get an error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

2

2 Answers

4
votes

As cricket_007 says, you need to tell Connect to use Json deserialiser, if that's the format your data is in. Add this to your connector configuration:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false"
1
votes

That error happens because it's trying to read non Confluent Schema Registry encoded Avro messages.

If the topic data is Avro, it needs to use the Schema Registry.

Otherwise, if topic data is JSON, then you've started the connect cluster with AvroConverter on your keys or values in the property file, where you need to use the JsonConverter instead