I'm working on kafka use case where I need to have transactional semantics on producer & consumer side..I'm able to publish transactional messages using kafka transaction API 0.11 to the kafka cluster but on consumer side I'm facing the issue...I have set isolation.level=read_committed
in properties file But I'm not able to consume it..I could see messages being consumed with isolation.level=read_uncommitted
but this is not desired..
producer code
package com.org.kafkaPro;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import kafka.common.AuthorizationException;
import kafka.common.KafkaException;
public class ProducerWithTx
{
public static void main(String args[]) throws FileNotFoundException {
URL in = ProducerWithTx.class.getResource("producertx.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Paymnt pay1= new Paymnt();
pay1.setAccid(1);
pay1.setAccountstate("y");
pay1.setAccountzipcode(111);
pay1.setBankid(12);
pay1.setCreditcardtype(15);
pay1.setCustomerid("2");
SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd");
Date t = null;
try {
t = ft.parse("2017-11-10");
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pay1.setPeriodid(t);
String timeStamp = new SimpleDateFormat("yyyy.MM.dd:HH:mm:ss").format(new Date());
props.put("transactional.id", "accid=" + pay1.getAccid() + " custid=" +pay1.getCustomerid()+ " timestmp=" +timeStamp);
KafkaProducer<String, Paymnt> producer = new KafkaProducer(props);
producer.initTransactions();
try{
producer.beginTransaction();
//RecordMetadata metadata=producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1)).get();
producer.send((ProducerRecord<String, Paymnt>) new ProducerRecord<String, Paymnt>("test",pay1));
producer.commitTransaction();
//System.out.println("written to" +metadata.partition());
}
catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
}
catch(KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
producertx.properties
metadata.broker.list=localhost:9092
bootstrap.servers=localhost:9092
acks=all
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.org.kafkaPro.PaySerializer
#transactional.id=1
enable.idempotence=true
num.partitions=3
Consumer
package com.org.kafkaPro;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Consumer {
private static List<ConsumerMultiThreaded> consumersGroup;
public static void main(String args[]) throws IOException {
URL in = ProducerWithTx.class.getResource("consumer.properties");
Properties props = new Properties();
try {
props.load(new FileReader(new File(in.getFile())));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
consumersGroup=new ArrayList<ConsumerMultiThreaded>();
ConsumerMultiThreaded con1= new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con2=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con3=new ConsumerMultiThreaded(props);
ConsumerMultiThreaded con4=new ConsumerMultiThreaded(props);
consumersGroup.add(con1);
consumersGroup.add(con2);
consumersGroup.add(con3);
consumersGroup.add(con4);
for (ConsumerMultiThreaded consumer : consumersGroup) {
Thread t=new Thread(consumer);
t.start();
}
while(true){
try {
Thread.sleep(100000);
} catch (InterruptedException ie) {
}
}
}
}
Consumer Runnable
public class ConsumerMultiThreaded implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String, Paymnt> consumer;
private final int minBatchSize =3;
private final List<ConsumerRecord<String, Paymnt>> buffer;
public ConsumerMultiThreaded(Properties props){
this.consumer= new KafkaConsumer<String, Paymnt>(props);
buffer = new ArrayList(minBatchSize);
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("test"));
while (!closed.get()) {
ConsumerRecords<String,Paymnt> records = consumer.poll(10000);
for (ConsumerRecord<String, Paymnt> record : records) {
buffer.add(record);
}
/*for (ConsumerRecord<String, Paymnt> record : records)
{
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +" value is " +record.value().toString());
}*/
if(buffer.size()>=minBatchSize){
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, Paymnt>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, Paymnt> record : partitionRecords) {
System.out.println("record consumed by Thread " +Thread.currentThread().getId() +"from partition" +record.partition() +"offset" +record.offset() + "value: " + record.value().toString());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
buffer.clear();
}
}
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
}
finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
consumer.properties
bootstrap.servers=localhost:9092
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.org.kafkaPro.PayDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
group.id=test
isolation.level=read_committed
Appreciate your help..Thank you