0
votes

I'm moving data from Mongodb -> Elasticsearch using kafka connect. At the moment the updated records are inserted as new documents in the Elasticsearch indices. However I want to update the exisiting records based on a ID (Similar to write.mode=upsert in JDBC Sink Connector). Is that possible?

1

1 Answers

0
votes

I solved the issue by setting up key.ignore=false then transform the event key from {id: 1234} to 1234 using the ExtractField SMT

    key.ignore=false
    transforms=ExtractField
    transforms.ExtractField.field=id
    transforms.ExtractField.type=org.apache.kafka.connect.transforms.ExtractField$Key