13
votes

I saw in a video tutorial that Kafka Broker supports 3 types of acknowledgement when producer posts a message.

0 - Fire and Forget
1 - Leader Ack
2 - Ack of all the brokers

I am using Kafka's Java API to post message. Is this something that has to be set for each broker using server.properties specific to each broker or is it something that has to be set by producer? If it has to be set by the producer , please explain how it can be set using Java API.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaProducerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        try{
            for(int i=0;i<150;i++) {
                RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                System.out.println(" Offset = " + ack.offset());
                System.out.println(" Partition = " + ack.partition());
            }
        } catch (Exception ex){
            ex.printStackTrace();
        } finally {
            kafkaProducer.close();
        }



    }

}
2
Actually I thought it supported a continuous range of values: -1 == "all" == leader and all in-sync-replicas, 0 == fire-and-forget, 1 == just the leader, 2 == leader and one more replica, 3 == leader and two more replicas, and so on?Michal Borowiecki
I suppose acks >1 was removed from v0.9 onward: cwiki.apache.org/confluence/display/KAFKA/…Michal Borowiecki

2 Answers

12
votes

It's a producer property and is set similar to other properties you have in your code:

properties.put("acks","all");

The list of all configurable producer properties can be found here.

You might want to also look at the broker (or topic) property min.insync.replicas that is related to this producer config.

7
votes

I think you should understand the acks property what has actually done and look at the also behind scenes. If that is ok, you will see this property is configured by the producer.

For example, you mustn't lose any message like an audit log. The following code how we would start our producer config:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");

This is a small but powerful change that has a major impact on if a message is going to arrive or not.

This images that from Kafka In Action book which represents more clear for acks property:

enter image description here

enter image description here

enter image description here