Kafka Consumer is not able to consume any message when it's having transactional semantics in properties. But when I remove that property or update that property to read_uncommited, its consuming messages.
Below is my Kafka Consumer Properties:-
Properties props = new Properties();
props.put("bootstrap.servers", "10.2.200.15:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the
// maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("isolation.level","read_committed");
Kafka Producer is having a Transactional id in its properties and after pushing some messages it is committing transaction as a whole. Below is the Kafka Producer Properties:-
log.info("Initializing Properties"); Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put("linger.ms", 1000);
props.put("acks", "all");
// props.put("request.timeout.ms",30000);
props.put("retries", 3);
props.put("retry.backoff.ms", 1000);
props.put("max.in.flight.requests.per.connection", 1); // if its greater than 1, it can change the order or records. Maximum no. of unacknowledge request a client can send.
props.put("enable.idempotence", true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"Transaction8");
Below snippet is responsible to commit transactions :-
public boolean send(ProducerRecordImpl record) {
try {
producer.beginTransaction();
for (int i = 0; i < 10; i++) {
Future<RecordMetadata> futureResult = producer
.send(new ProducerRecord<String, String>(record.getTopic(), record.getPayload()));
/*
* It will wait till the thread execution completes and return true.
*/
//RecordMetadata ack = futureResult.get();
//log.debug("RecordMetadta offset {} and partiton {} ", ack.offset(), ack.partition());
}
producer.commitTransaction();
log.info("Commited");
return true;
I am not able to understand whether commit is not happening properly from the Producer Side which leads to Kafka Consumer not able to read it with a transactional semantics or problem perists with Kafka Consumer side.
Any help will be appreciated.