2
votes

I'm trying to set up a test to move data from MySQL to Elasticsearch.

I have a dockerized setup with broker, zookeeper, connect, ksql server and cli, schema registry and Elasticsearch. I'm using the docker images from confluent version 5.1.0 and for Elasticsearch I'm using elasticsearch:6.5.4

I configured a JDBC connector to get data from MySQL into Kafka, this is working I see my topics created and using ksql-cli I can see the new messages in the stream as I update rows in MySQL.

I also configured an Elasticsearch sink connector the connector is created successfully and the index in Elasticsearch is also there, but I see no documents in my Elasticsearch index.

This is the ES sink connector configuration:

{
    "name": "es-connector",
    "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.url": "http://es:9200",
            "type.name": "_doc",
            "topics": "test_topic",
            "drop.invalid.message": true,
            "behavior.on.null.values": "ignore",
            "behavior.on.malformed.documents": "ignore",
            "schema.ignore": true
    }
}

This is what I see when I query the status of the sink connector: curl -X GET http://connect:8083/connectors/es-connector

{
    "name": "es-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
            "state": "RUNNING",
            "id": 0,
            "worker_id": "connect:8083"
        }
    ],
    "type": "sink"
}

In Elasticsearch I can see the index http://es:9200/test_topic/_search

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

I keep making updates and inserts in MySQL, I see the messages in the stream using ksql-cli but no documents are created in Elasticsearch. I even created a topic manually using kafka-avro-console-producer and published messages, then created a second sink connector for this topic and the same result, I see the index but no documents.

I see no errors in kafka-connect so I don't understand why is not working. Is there something wrong with the connector configuration? Am I missing something?

Edit:

For the Elasticsearch sink configuration I tried with and without these lines:

"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true

And the result is the same, no documents.

Edit

I found the error:

Key is used as document id and cannot be null

.

1
Could you add sample message, that is used as source for Elasticsearch sink Connector?Bartosz Wardziński
@wardziniak I dont see why this is important but here it is, for the jdbc connector a message looks like: {"email", "[email protected]"}and for the ad-hoc topic I created a message looks like {"f1":"value1"}lloiacono
I would suggest to increase log level for Kafka Connect and check in log the content of records, maybe they are nullsBartosz Wardziński

1 Answers

3
votes

With

"key.ignore": true

the Elasticsearch sink will use the topic+partition+offset as the Elasticsearch document ID. As you found, you will get a new document for every message.

With

"key.ignore": false

the Elasticsearch sink will use the Key of the Kafka message as the Elasticsearch document ID. If you don't have a key in your Kafka message, you will understandably get the error Key is used as document id and cannot be null. You can use various methods for setting the key in a Kafka message, including Single Message Transform to set the Kafka message key if you're ingesting through Kafka Connect, detailed here.