0
votes

I have the following cluster:

Kafka -> some log collector -> Elasticsearch

My problem is about choosing the most efficient log collector (or some other software, which allows to manage dataflows between Kafka and ElasticSearch).

I'm trying to choose from Logstash, Fluentd and Confluent's Kafka Elasticsearch connector. The main problem i'm facing is impossibility to rollback offset in Kafka after having problems writing to the Elasticsearch endpoint.

For example, logstash doc says that "400 and 404 errors are sent to the dead letter queue (DLQ), if enabled. If a DLQ is not enabled, a log message will be emitted, and the event will be dropped" (https://www.elastic.co/guide/en/logstash/6.x/plugins-outputs-elasticsearch.html#_retry_policy). If i have such an error, logstash would continue to read data from Kafka. Error would occur again and again. Though, all my data will be stored into the DLQ, Kafka's offset will be moved far away from the position, when the first error occured. I would have to define correct offset manually.

So, my question is: Is there any connector for Kafka and ElasticSearch, which allows to stop moving offset after receiving first error from ElasticSearch (400/404)?

Thanks in advance.

1

1 Answers

0
votes

I don't think the question is about efficiency, rather reliability

The main problem i'm facing is impossibility to rollback offset in Kafka after having problems writing to the Elasticsearch endpoint.

I don't have much experience with the DLQ features of Connect or Logstash, but resetting the consumer group offset is not impossible. However, that shouldn't be necessary if the consumer application correctly handles offset commits.

If Connect throws a connection error to ES, it'll retry, not commit offsets.

If the error is unrecoverable, then Connect will stop consuming, and again, not commit offsets.

So, the only way you would get missed data from a message batch is if that batch ended up in a DLQ, using whatever framework.

If DLQ is disabled, the only way to lose data would be if it expires from Kafka