2
votes

I am trying to implement exactly-once semantic with idempotent Producer & transaction for one of the banks usecase.

I have created producer like this:

String topicName = "exonce";

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("enable.idempotence", "true");
props.put("transactional.id", "1");
props.put("acks", "all");
props.put("transaction.timeout.ms","160000");

Thread.currentThread().setContextClassLoader(null);
Producer<String, String> producer = new KafkaProducer
   <String, String>(props);
producer.initTransactions();
producer.beginTransaction();

producer.send(new ProducerRecord<String, String>(topicName,           "val_"));

producer.commitTransaction();
System.out.println("Message sent successfully");
producer.close();

But I am not able to get anything in consumer side & also i dont see the sysout: "Message sent successfully" at-least sysout should have come/shown.

Program isnt ending it waits for something to happen (no errors/exception). waiting at: producer.initTransactions(); line

This is the log:

17/07/12 08:46:36 INFO producer.ProducerConfig: ProducerConfig values: acks = all

batch.size = 16384

bootstrap.servers = [localhost:9092]

buffer.memory = 33554432

client.id =

compression.type = none

connections.max.idle.ms = 540000

enable.idempotence = true

interceptor.classes = null

key.serializer = class org.apache.kafka.common.serialization.StringSerializer

linger.ms = 1

max.block.ms = 60000

max.in.flight.requests.per.connection = 5

max.request.size = 1048576

metadata.max.age.ms = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

metrics.sample.window.ms = 30000

partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes = 32768

reconnect.backoff.max.ms = 1000

reconnect.backoff.ms = 50

request.timeout.ms = 30000

retries = 1

retry.backoff.ms = 100

transaction.timeout.ms = 160000

transactional.id = 1

value.serializer = class org.apache.kafka.common.serialization.StringSerializer

17/07/12 08:46:36 INFO producer.KafkaProducer: Instantiated a transactional producer.

17/07/12 08:46:36 INFO producer.KafkaProducer: Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.

17/07/12 08:46:37 INFO utils.AppInfoParser: Kafka version : 0.11.0.0

17/07/12 08:46:37 INFO utils.AppInfoParser: Kafka commitId : cb8625948210849f

17/07/12 08:46:37 INFO internals.TransactionManager: [TransactionalId 1] ProducerId set to -1 with epoch -1

I am not sure what mistake I am making here.

I am using kafka-0.11.0.0

please let me know if you need anymore information.

Appreciate your help & support.

Thanks

Raj

3

3 Answers

3
votes

Is this a single node installation? Can you check your server.log for whether the __transaction_state topic was created properly? It needs 3 replicas in order to be created, and it is only created on the first initTransactions request. So if you don't have enough brokers, the creation of the topic will fail, and the initTransactions request may block forever.

3
votes

This is what I found out after following the hint that transactions by default require 3 brokers. Furthermore, by default, transactions require 2 replicas in sync.

In my Docker based environment I thus reduced those two settings on my broker:

KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1

These settings correspond to the documented settings from https://kafka.apache.org/0110/documentation.html#brokerconfigs

transaction.state.log.min.isr
transaction.state.log.replication.factor

Note: ISR stands for in-sync replicas

And one more: In case you are using confluent's default platform setup (not the docker containers), these settings are preconfigured already.

-1
votes

Updated answer to try running the example in the javadoc as a test that the broker is configured correctly for 0.11 with 0.11 client protocol

 Properties props = new Properties();


props.put("bootstrap.servers", "localhost:9092");
 props.put("transactional.id", "my-transactional-id");
 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 producer.initTransactions();

 try {
     producer.beginTransaction();
     for (int i = 0; i < 100; i++)
         producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
     producer.commitTransaction();
 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
     // We can't recover from these exceptions, so our only option is to close the producer and exit.
     producer.close();
 } catch (KafkaException e) {
     // For all other exceptions, just abort the transaction and try again.
     producer.abortTransaction();
 }
 producer.close();