I'm trying to set up a test to move data from MySQL to Elasticsearch.
I have a dockerized setup with broker, zookeeper, connect, ksql server and cli, schema registry and Elasticsearch. I'm using the docker images from confluent version 5.1.0 and for Elasticsearch I'm using elasticsearch:6.5.4
I configured a JDBC connector to get data from MySQL into Kafka, this is working I see my topics created and using ksql-cli I can see the new messages in the stream as I update rows in MySQL.
I also configured an Elasticsearch sink connector the connector is created successfully and the index in Elasticsearch is also there, but I see no documents in my Elasticsearch index.
This is the ES sink connector configuration:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
This is what I see when I query the status of the sink connector: curl -X GET http://connect:8083/connectors/es-connector
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
In Elasticsearch I can see the index http://es:9200/test_topic/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
I keep making updates and inserts in MySQL, I see the messages in the stream using ksql-cli but no documents are created in Elasticsearch. I even created a topic manually using kafka-avro-console-producer
and published messages, then created a second sink connector for this topic and the same result, I see the index but no documents.
I see no errors in kafka-connect so I don't understand why is not working. Is there something wrong with the connector configuration? Am I missing something?
Edit:
For the Elasticsearch sink configuration I tried with and without these lines:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
And the result is the same, no documents.
Edit
I found the error:
Key is used as document id and cannot be null
.
{"email", "[email protected]"}
and for the ad-hoc topic I created a message looks like{"f1":"value1"}
– lloiacono