I am trying to implement exactly-once semantic with idempotent Producer & transaction for one of the banks usecase.
I have created producer like this:
String topicName = "exonce";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("transactional.id", "1");
props.put("acks", "all");
props.put("transaction.timeout.ms","160000");
Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>(topicName, "val_"));
producer.commitTransaction();
System.out.println("Message sent successfully");
producer.close();
But I am not able to get anything in consumer side & also i dont see the sysout: "Message sent successfully" at-least sysout should have come/shown.
Program isnt ending it waits for something to happen (no errors/exception). waiting at: producer.initTransactions(); line
This is the log:
17/07/12 08:46:36 INFO producer.ProducerConfig: ProducerConfig values: acks = all
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = true
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 1
retry.backoff.ms = 100
transaction.timeout.ms = 160000
transactional.id = 1
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
17/07/12 08:46:36 INFO producer.KafkaProducer: Instantiated a transactional producer.
17/07/12 08:46:36 INFO producer.KafkaProducer: Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
17/07/12 08:46:37 INFO utils.AppInfoParser: Kafka version : 0.11.0.0
17/07/12 08:46:37 INFO utils.AppInfoParser: Kafka commitId : cb8625948210849f
17/07/12 08:46:37 INFO internals.TransactionManager: [TransactionalId 1] ProducerId set to -1 with epoch -1
I am not sure what mistake I am making here.
I am using kafka-0.11.0.0
please let me know if you need anymore information.
Appreciate your help & support.
Thanks
Raj