3
votes

I've Kafka topic with multiple type of messages flowing in and writing to Elastic Search using Kafka Connect. Streaming looks good until, I've to separate unique set of messages into unique index. I.e. I've to get the new index for the new set of data based on the fields (are JSON messages).

How do I configure/customize the Kafka connect to do the same for me? Each message contains a filed represents the type of message and the timestamp.

The sample Json looks like: Sample1: {"log":{"data":"information", "version":"1.1"}, "type":"xyz", "timestamp":"2019-08-28t10:07:40.370z", "value":{}} ,

Sample2: {"log":{"data":"information", "version":"1.1", "value":{}}, "type":"abc", "timestamp":"2019-08-28t10:07:40.370z" }

I would like to customize/configure the Kafka connect ES sink to write Sample1 doc to index 'xyz.20190828' and Sample2 doc to index 'abc.20190828'.

I'm using Kafka-2.2.0, and confluentinc-kafka-connect-elasticsearch-5.2.1 plugin.

Appreciate the help.

1
Unfortunately, such complex operation are not possible via Kafka Connect, you should first process data in Kafka Streams and than use Kafka Connect to push data to ELK - Bartosz Wardziński

1 Answers

3
votes

You could do this using a custom Single Message Transform (SMT) which you would need to write yourself. By changing the topic of a message based on its contents you will route it to a different Elasticsearch index.

Currently Apache Kafka ships with a SMT which can rename entire topics (RegExRouter) or add timestamps (TimestampRouter). You may find these a useful starting point for writing your own.

The alternative is as @wardzniak suggests in his comment—use stream processing (e.g. Kafka Streams, KSQL) to pre-process the source topic before using Kafka Connect to send the resulting separate topics to Elasticsearch.