I created the simplest kafka sink connector config and I'm using confluent 4.1.0:
{
"connector.class":
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "test-type",
"tasks.max": "1",
"topics": "dialogs",
"name": "elasticsearch-sink",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"schema.ignore": "true"
}
and in the topic I save the messages in JSON
{ "topics": "resd"}
But in the result I get an error:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!