We are having problems enforcing the order in which messages from a Kafka topic are sent to Elasticsearch using the Kafka Connect Elasticsearch Connector. In the topic the messages are in the right order with the correct offsets, but if there are two messages with the same ID created in quick succession, they are intermittently sent to Elasticsearch in the wrong order. This causes Elasticsearch to have the data from the second last message, not from the last message. If we add some artificial delay of a second or two between the two messages in the topic, the problem disappears.
The documentation here states:
Document-level update ordering is ensured by using the partition-level Kafka offset as the document version, and using
version_mode=external
.
However I can't find any documentation anywhere about this version_mode
setting, and whether it's something we need to set ourselves somewhere.
In the log files from the Kafka Connect system we can see the two messages (for the same ID) being processed in the wrong order, a few milliseconds apart. It might be significant that it looks like these are processed in different threads. Also note that there is only one partition for this topic, so all messages are in the same partition.
Below is the log snippet, slightly edited for clarity. The messages in the Kafka topic are populated by Debezium, which I don't think is relevant to the problem, but handily happens to include a timestamp value. This shows that the messages are processed in the wrong order (though they're in the correct order in the Kafka topic, populated by Debezium):
[2019-01-17 09:10:05,671] DEBUG http-outgoing-1 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE SECOND UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER SECOND UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716205205
}
" (org.apache.http.wire)
...
[2019-01-17 09:10:05,696] DEBUG http-outgoing-2 >> "
{
"op": "u",
"before": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM BEFORE FIRST UPDATE >> ...
},
"after": {
"id": "ac025cb2-1a37-11e9-9c89-7945a1bd7dd1",
... << DATA FROM AFTER FIRST UPDATE >> ...
},
"source": { ... },
"ts_ms": 1547716204190
}
" (org.apache.http.wire)
Does anyone know how to force this connector to maintain message order for a given document ID when sending the messages to Elasticsearch?
{ "id": "ac025cbe-1a37-11e9-9c89-7945a1bd7dd1" }
– Yoni Gibbstasks.max
do you have configured for the connector? – Robin Moffatttasks.max
is1
for both the Elasticsearch connector and the Debezium connector. (The Debezium connector reads data from Postgres and puts it into the Kafka topic; this is then sent to Elasticsearch by the Elasticsearch connector.) – Yoni Gibbs