1
votes

Spring managed KafkaTemplate provides

template.send(record).addCallback(...
template.executeInTransaction(...

Now let's say I have a method doWork() which is triggered on a event (say a TCP/IP message).

@Autowired
KafkaTemplate template;

// This method is triggered on a event
doWork(EventType event){
    switch(event){
        case Events.Type1 :
            template.send(record); break;
        case Events.Type2 :
            // Question : How do I achieve a commit of all my previous sends here?
         default : break;
    }
}

Basically, I need to achieve a transaction by adding @Transaction over doWork() or a

template.executeInTransaction(...

in code. But I want to batch a couple of [template.send()]s and do a commit after a couple of calls to the doWork() method, how do I achieve that?

My producer configurations has transactions enabled and a KafkaTransactionManager wired to the producer factory.

1

1 Answers

2
votes
kafkaTemplate.executeInTransaction(t -> {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

As long as the doWork() method uses the same template, and it runs within the scope of the callback, the work will run in the transaction.

Or

@Transactional
public void doIt() {
    boolean stayIntransaction = true;
    while (stayInTransaction) {
        Event event = readTcp()
        doWork(event);
        stayInTransaction = transactionDone(event);
    }
}

When using declarative transactions.

If the TCP events are async, you will somehow need to hand them off to the thread running the transaction, such as using a BlockingQueue<?>.