I'm moving data from Mongodb -> Elasticsearch using kafka connect. At the moment the updated records are inserted as new documents in the Elasticsearch indices. However I want to update the exisiting records based on a ID (Similar to write.mode=upsert in JDBC Sink Connector). Is that possible?