In my application 1) we are consuming from a topic (firstTopic) , 2) process the request by calling some 3rd party services and (We have a retryTemplate here for exponential backoff) 3) submit to second topic (secondtopic).
We are also manually updating the offset after successful submission. KafkaTransactionManager is used to maintain the transactions. Since the message is posted to firstTopic through a RestController our @Transactional starts there and ends when the offset is updated. We are using executeInTransaction() for this.
KafkaConfiguration
@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.RETRIES_CONFIG, 10);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
//defaultKafkaProducerFactory.transactionCapable();
return defaultKafkaProducerFactory;
//return new DefaultKafkaProducerFactory<>(config);
}
/**
* New configuration for the consumerFactory added
*
* @return
*/
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager());
factory.setRetryTemplate(kafkaRetry());
factory.setStatefulRetry(true);
factory.setErrorHandler(getErrorHandler());
factory.setRecoveryCallback(retryContext -> {
//implement the logic to decide the action after all retries are over.
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
System.out.println("Recovery is called for message " + consumerRecord.value());
return Optional.empty();
});
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(10 * 1000);
backOffPolicy.setMultiplier(1);
backOffPolicy.setMaxInterval(60 * 1000); // original 25 * 60 * 1000
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public SeekToCurrentErrorHandler getErrorHandler() {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
//super.handle(thrownException, records, consumer, container);
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException instanceof DeserializationException) {
System.out.println("------1111------deserialization exception ");
} else {
System.out.println("------xxxx------Record is empty ");
consumer.seek(new TopicPartition(topic, partition), offset);
}
} else {
System.out.println("------4444------Record is empty ");
}
}
};
return errorHandler;
}
Rest Controller
@RestController
@RequestMapping("accounts")
public class UserResource {
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
@PostMapping("/users")
@Transactional
public String postComments(@RequestParam("firstName") final String firstName,
@RequestParam("lastName") final String lastName,
@RequestParam("userName") final String userName) {
List<String> accountTypes = new ArrayList<String>();
kafkaTemplate.send("firstTopic", new User(firstName, lastName, userName));
return "Message sent to the Error queue";
}
}
service
@Service
public class KafkaMessageConsumerService {
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
int index = 0;
@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
//@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {
kafkaTemplate.executeInTransaction(t -> {
int number = (int) (Math.random() * 10);
t.send("secondtopic", user);
if (number % 2 == 0 || number % 2 == 1) {
System.out.println("about to ack");
throw new RuntimeException(" Modulus is zero ");
}
acknowledgment.acknowledge();
return true;
});
}
}
We have also added a producer retry (10) to the secondtopic. In executeInTransaction() where we are producing to secondtopic we are testing producer failures by throwing a manual exception between the send to secondtopic and offset commit. What we observe is that when the exception is thrown (inside executeInTransaction()) it retries the logic outside the executeInTransaction() and after 10 retries throws this exception
2020-06-02 11:10:34.869 ERROR 685813 --- [ntainer#0-0-C-1] o.s.k.l.DefaultAfterRollbackProcessor : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for ConsumerRecord(topic = firstTopic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1591114230710, serialized key size = -1, serialized value size = 61, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.barade.sandesh.springKafka.model.User@4d4040ae)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.barade.sandesh.springKafka.service.KafkaMessageConsumerService.onCustomerMessage(com.barade.sandesh.springKafka.model.User,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.RuntimeException: Modulus is zero
Is this the correct way of addressing the producer failures ?
Is there a way to capture the exception after the 10th retry and process the message according to business rule ? Can we keep 2 RetryTemplates and SeekToErrorHandlers ?
executeInTransaction
in any of the above code. 2) There is no listener shown at all and no code calling an Acknowledgment. Also provide version information. – Gary Russell