1
votes

I am new to spring and kafka . I have a use case to consume from a kafka topic and produce to another topic using a transactional producer(messages should be processed only once) .I saw the discussion on this thread (https://github.com/spring-projects/spring-kafka/issues/645) but implemented it little differently . I set a manual ack mode in the listener container factory and then did an acknowledgent after sending to producer using kafkatemplate.executeinTransaction(aysnc send) .Does that acheive the same result as this one ? . Since the send is asynchnrous I am not sure it will serve the purpose

Also in the above example on issue 645 when does the actual commit to kafka broker happen ?(consumer see the data ). Does it happen on a commit interval or record by record ?.I am trying to understand if the actual commit happens on a time interval/for every record or is it something configurable.

2

2 Answers

1
votes

If you are using transactions you should not commit offsets via the consumer; instead, you should send the offsets to the transaction using the producer.

If properly configured, the listener container will do it automatically for you when the listener exits. See the documentation.

By configuring the listener container with a KafkaTransactionManager, the container starts the transaction. Any sends on a transactional KafkaTemplate will participate in the transaction and the container will send the offsets to the transaction (and commit the transaction) when the listener exits normally.

See the Javadocs for executeInTransaction...

/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

Such operations will not participate in the container's transaction.

1
votes

Coming to consumer offset commit there are two ways, one is enabling

enable.auto.commit is set to true
auto.commit.interval.ms      // configuring time for commit intervals

The other way is commit offset manually through Acknowledgement

@KafkaListener(topics="${kafka.consumer.topic}", containerFactory="kafkaListenerContainerFactory", groupId="${kafka.consumer.groupId}")
public void taskListner(Task task, Acknowledgment Ack) {
    //System.out.println(task.toString());
    log.info(task.toString());
    Ack.acknowledge();              
}

The consumer auto check is called in every poll interval and checks time elapsed is greater than configured, if then it will commit the offset