0
votes

Kafka Consumer is not able to consume any message when it's having transactional semantics in properties. But when I remove that property or update that property to read_uncommited, its consuming messages.

Below is my Kafka Consumer Properties:-

Properties props = new Properties();
    props.put("bootstrap.servers", "10.2.200.15:9092");
    String consumeGroup = "cg3";
    props.put("group.id", consumeGroup);
    // Below is a key setting to turn off the auto commit.
    props.put("enable.auto.commit", "false");
    props.put("heartbeat.interval.ms", "2000");
    props.put("session.timeout.ms", "6001");
    // Control maximum data on each poll, make sure this value is bigger than the
    // maximum // single message size
    props.put("max.partition.fetch.bytes", "140");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("isolation.level","read_committed");

Kafka Producer is having a Transactional id in its properties and after pushing some messages it is committing transaction as a whole. Below is the Kafka Producer Properties:-

log.info("Initializing Properties"); Properties props = new Properties();

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringSerializer");
    props.put("linger.ms", 1000);
    props.put("acks", "all");
    // props.put("request.timeout.ms",30000);
    props.put("retries", 3);
    props.put("retry.backoff.ms", 1000);
    props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
    props.put("enable.idempotence", true);
    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");

Below snippet is responsible to commit transactions :-

public boolean send(ProducerRecordImpl record) {
    try {
        producer.beginTransaction();
        for (int i = 0; i < 10; i++) {


            Future<RecordMetadata> futureResult = producer
                    .send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
            /*
             * It will wait till the thread execution completes and return true.
             */
            //RecordMetadata ack = futureResult.get();
            //log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
        }
        producer.commitTransaction();
        log.info("Commited");


        return true;

I am not able to understand whether commit is not happening properly from the Producer Side which leads to Kafka Consumer not able to read it with a transactional semantics or problem perists with Kafka Consumer side.

Any help will be appreciated.

2
I drilled down the issue and found out that my last stable offset is at 45. But my current offset in the consumer side is 365. So read_commited will read only from last stable offset, when i changed my current offset to 30 , Kafka Consumer starts consuming messages with read_commited property.priyam singh
So my Question is why my last stable offset is not getting updated by Kafka Producer when i am commiting the transaction from that side.priyam singh

2 Answers

0
votes

You need to call producer.initTransactions() first. Otherwise your producer is not publishing transactional messages.

From https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#initTransactions()

Needs to be called before any other methods when the transactional.id is set in the configuration. This method does the following: 1. Ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance had failed with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion. 2. Gets the internal producer id and epoch, used in all future transactional messages issued by the producer.

-1
votes

I had the same problem when testing Transactions on Kafka. The problem was the operating system. I was using Windows 10 to run Kafka brokers, and the consumer couldn't see any committed transactions when they were configure as "read_committed", once I move the brokers to Linux, the transaction (and the consumers) started working. By the way, Kafka didn't show me any error in the logs. Hope it helps.