0
votes

Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's 'retry' mechanism doesn't kick in until a message is added to it's internal buffer.

If there's an exception before that, KafkaProducer will throw that exception, and seems like Flink isn't handling that. In this case there will be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

  1. All kafka brokers are down.

    In this case, before appending a message to it's buffer, KafkaProducer tries to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in configured timeout, it throws an exception.

  2. -Memory records not writable (Existing bug in kafka 0.9.0.1 library)

https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore the messages. the messages aren't even logged. The exception is, but not the messages which failed.

Possible workarounds (Kafka settings):

  1. A very high value for metadata timeout (metadata.fetch.timeout.ms)
  2. A very high value for buffer expiry (request.timeout.ms)

We're still investigating the possible side effects of changing the above kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data loss by modifying some Flink settings?

Thanks.

1

1 Answers

0
votes

Here is what I am thinking of your questions. See one of Kafka guarantees first:

For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any records committed to the log.

Firstly, it cares about the messages or records committed to the log. Any records that got failed to be delivered are not seen as committed. Secondly, if all your brokers were down, there will be some data loss.

Settings below are what we use to prevent data loss on producer side:

  • block.on.buffer.full = true
  • acks = all
  • retries = MAX_VALUE
  • max.in.flight.requests.per.connection = 1
  • Use KafkaProducer.send(record, callback) instead of send(record)
  • unclean.leader.election.enable=false
  • replication.factor > min.insync.replicas
  • min.insync.replicas > 1