0
votes

In my application 1) we are consuming from a topic (firstTopic) , 2) process the request by calling some 3rd party services and (We have a retryTemplate here for exponential backoff) 3) submit to second topic (secondtopic).

We are also manually updating the offset after successful submission. KafkaTransactionManager is used to maintain the transactions. Since the message is posted to firstTopic through a RestController our @Transactional starts there and ends when the offset is updated. We are using executeInTransaction() for this.

KafkaConfiguration


@Bean
public ProducerFactory producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    config.put(ProducerConfig.RETRIES_CONFIG, 10);
    config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
    DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
    defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
    //defaultKafkaProducerFactory.transactionCapable();
    return defaultKafkaProducerFactory;
    //return new DefaultKafkaProducerFactory<>(config);
}

/**
 * New configuration for the consumerFactory added
 *
 * @return
 */
@Bean
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}


@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.getContainerProperties().setTransactionManager(transactionManager());
    factory.setRetryTemplate(kafkaRetry());
    factory.setStatefulRetry(true);
    factory.setErrorHandler(getErrorHandler());
    factory.setRecoveryCallback(retryContext -> {
        //implement the logic to decide the action after all retries are over.
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        System.out.println("Recovery is called for message  " + consumerRecord.value());
        return Optional.empty();
    });

    return factory;
}


public RetryTemplate kafkaRetry() {
    RetryTemplate retryTemplate = new RetryTemplate();

    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(10 * 1000);
    backOffPolicy.setMultiplier(1);
    backOffPolicy.setMaxInterval(60 * 1000);       // original 25 * 60 * 1000
    retryTemplate.setBackOffPolicy(backOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(4);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}


public SeekToCurrentErrorHandler getErrorHandler() {
    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {

        @Override
        public void handle(Exception thrownException,
                           List<ConsumerRecord<?, ?>> records,
                           Consumer<?, ?> consumer,
                           MessageListenerContainer container) {
            //super.handle(thrownException, records, consumer, container);
            if (!records.isEmpty()) {
                ConsumerRecord<?, ?> record = records.get(0);
                String topic = record.topic();
                long offset = record.offset();
                int partition = record.partition();

                if (thrownException instanceof DeserializationException) {
                    System.out.println("------1111------deserialization exception ");
                } else {
                    System.out.println("------xxxx------Record is empty ");
                    consumer.seek(new TopicPartition(topic, partition), offset);
                }
            } else {
                System.out.println("------4444------Record is empty ");
            }

        }
    };

    return errorHandler;
}

Rest Controller

@RestController
@RequestMapping("accounts")
public class UserResource {

    @Autowired
    KafkaTemplate<String, User> kafkaTemplate;


    @PostMapping("/users")
    @Transactional
    public String postComments(@RequestParam("firstName") final String firstName,
                               @RequestParam("lastName") final String lastName,
                               @RequestParam("userName") final String userName) {

        List<String> accountTypes = new ArrayList<String>();
        kafkaTemplate.send("firstTopic", new User(firstName, lastName, userName));

        return "Message sent to the Error queue";
    }

}

service

@Service
public class KafkaMessageConsumerService {

    @Autowired
    KafkaTemplate<String, User> kafkaTemplate;

    int index = 0;

    @KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
    //@Transactional
    public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {            


        kafkaTemplate.executeInTransaction(t -> {
            int number = (int) (Math.random() * 10);


            t.send("secondtopic", user);
            if (number % 2 == 0 || number % 2 == 1) {
                System.out.println("about to ack");
                throw new RuntimeException(" Modulus is zero ");
            }
            acknowledgment.acknowledge();

            return true;
        }); 
    }


}

We have also added a producer retry (10) to the secondtopic. In executeInTransaction() where we are producing to secondtopic we are testing producer failures by throwing a manual exception between the send to secondtopic and offset commit. What we observe is that when the exception is thrown (inside executeInTransaction()) it retries the logic outside the executeInTransaction() and after 10 retries throws this exception

2020-06-02 11:10:34.869 ERROR 685813 --- [ntainer#0-0-C-1] o.s.k.l.DefaultAfterRollbackProcessor : Backoff FixedBackOff{interval=0, currentAttempts=10, maxAttempts=9} exhausted for ConsumerRecord(topic = firstTopic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1591114230710, serialized key size = -1, serialized value size = 61, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.barade.sandesh.springKafka.model.User@4d4040ae)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.barade.sandesh.springKafka.service.KafkaMessageConsumerService.onCustomerMessage(com.barade.sandesh.springKafka.model.User,org.springframework.kafka.support.Acknowledgment) throws java.lang.Exception' threw exception; nested exception is java.lang.RuntimeException: Modulus is zero

Is this the correct way of addressing the producer failures ?

Is there a way to capture the exception after the 10th retry and process the message according to business rule ? Can we keep 2 RetryTemplates and SeekToErrorHandlers ?

1
This description is incomplete: 1) there is no executeInTransaction in any of the above code. 2) There is no listener shown at all and no code calling an Acknowledgment. Also provide version information.Gary Russell
i was getting formatting issue when submitting the request, must have been removed during the edits. I have added the codecodebuilder

1 Answers

0
votes

You shouldn't be using executeInTransaction there (unless you really want to perform the publishing in a different transaction). The listener container has already started a transaction.

Furthermore, the acknowledge() sends the offsets to the container's transaction, not the one started by the template.

This could be why you are getting strange errors on the acknowledgment; I don't know.

Simply remove the executeInTransaction and the template will participate in the container's transaction.