I've Kafka topic with multiple type of messages flowing in and writing to Elastic Search using Kafka Connect. Streaming looks good until, I've to separate unique set of messages into unique index. I.e. I've to get the new index for the new set of data based on the fields (are JSON messages).
How do I configure/customize the Kafka connect to do the same for me? Each message contains a filed represents the type of message and the timestamp.
The sample Json looks like:
Sample1: {"log":{"data":"information", "version":"1.1"}, "type":"xyz", "timestamp":"2019-08-28t10:07:40.370z", "value":{}}
,
Sample2: {"log":{"data":"information", "version":"1.1", "value":{}}, "type":"abc", "timestamp":"2019-08-28t10:07:40.370z" }
I would like to customize/configure the Kafka connect ES sink to write Sample1 doc to index 'xyz.20190828' and Sample2 doc to index 'abc.20190828'.
I'm using Kafka-2.2.0, and confluentinc-kafka-connect-elasticsearch-5.2.1 plugin.
Appreciate the help.