1
votes

My spring boot service needs to consume kafka events off one topic, do some processing (including writing to the db with JPA) and then produce some events on a new topic. No matter what happens I cannot have a situation where I have published events without updating the database, and if anything goes wrong then I want the next poll of the consumer to retry the event. My processing logic including the db update is idempotent so retrying that is fine

I think I have achieved exactly once semantics as described on https://docs.spring.io/spring-kafka/reference/html/#exactly-once by using a ChainedKafkaTransactionManager like so:

@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
    kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return new ChainedKafkaTransactionManager(kafka, jpa); 
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        ChainedKafkaTransactionManager chainedTransactionManager) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setTransactionManager(chainedTransactionManager);

    return factory;
}

The relevant kafka config in my application.yaml file looks like:

  kafka:
    ...
    consumer:
      group-id: myGroupId
      auto-offset-reset: earliest
      properties:
        isolation.level: read_committed
      ...
    producer:
      transaction-id-prefix: ${random.uuid}
      ...

Because the commit order is critical to my application I would like to write a integration test to prove that the commits happen in the desired order and that if an error occurs during the commit to kafka then the original event is consumed again. However I am struggling to find a good way of causing a failure between the db commit and the kafka commit.

Any suggestions or alternative ways I could do this?

Thanks

1

1 Answers

2
votes

You could use a custom ProducerFactory to return a MockProducer (provided by kafka-clients.

Set the commitTransactionException so that it is thrown when the KTM tries to commit the transaction.

EDIT

Here is an example; it doesn't use the chained TM, but that shouldn't make a difference.

@SpringBootApplication
public class So66018178Application {

    public static void main(String[] args) {
        SpringApplication.run(So66018178Application.class, args);
    }

    @KafkaListener(id = "so66018178", topics = "so66018178")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {

    @Autowired
    EmbeddedKafkaBroker broker;

    @Test
    void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
            throws InterruptedException {

        registry.getListenerContainer("so66018178").stop();
        AtomicReference<Exception> listenerException = new AtomicReference<>();
        CountDownLatch latch = new CountDownLatch(1);
        ((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
                .setAfterRollbackProcessor(new AfterRollbackProcessor<>() {

                    @Override
                    public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
                            Exception exception, boolean recoverable) {

                        listenerException.set(exception);
                        latch.countDown();
                    }
                });
        registry.getListenerContainer("so66018178").start();

        Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
        template.send("so66018178", "test");
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
                .hasCause(config.exception);
    }

    @Configuration
    public static class Config {

        RuntimeException exception = new RuntimeException("test");

        @Bean
        public ProducerFactory<Object, Object> pf() {
            return new ProducerFactory<>() {

                @Override
                public Producer<Object, Object> createProducer() {
                    MockProducer<Object, Object> mockProducer = new MockProducer<>();
                    mockProducer.commitTransactionException = Config.this.exception;
                    return mockProducer;
                }

                @Override
                public Producer<Object, Object> createProducer(String txIdPrefix) {
                    Producer<Object, Object> producer = createProducer();
                    producer.initTransactions();
                    return producer;
                }

                @Override
                public boolean transactionCapable() {
                    return true;
                }

            };
        }

    }

}