1
votes

I have a consume-transform-produce workflow in a micro service using Spring(boot) Kafka. I need to achieve the exactly-once scemantics provided by Kafka transaction. Here's the code snippet below:

Config

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024);
    DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props);
    defaultKafkaProducerFactory.setTransactionIdPrefix("kafka-trx-");
    return defaultKafkaProducerFactory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager() {
    return new KafkaTransactionManager<>(producerFactory());
}

@Bean
@Qualifier("chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager(KafkaTransactionManager<String, String> kafkaTransactionManager) {
    return new ChainedKafkaTransactionManager<>(kafkaTransactionManager);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(ChainedKafkaTransactionManager<String, Object> chainedKafkaTransactionManager) {
    ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
    concurrentKafkaListenerContainerFactory.setBatchListener(true);
    concurrentKafkaListenerContainerFactory.setConcurrency(nexusConsumerConcurrency);
    //concurrentKafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate());
    concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    concurrentKafkaListenerContainerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
    return concurrentKafkaListenerContainerFactory;
}

Listener

@KafkaListener(topics = "${kafka.xxx.consumerTopic}", groupId = "${kafka.xxx.consumerGroup}", containerFactory = "concurrentKafkaListenerContainerFactory")
public void listen(@Payload List<String> msgs, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Integer> offsets) {

    int i = -1;
    for (String msg : msgs) {
        ++i;
        LOGGER.debug("partition={}; offset={}; msg={}", partitions.get(i), offsets.get(i), msg);
        String json = transform(msg);
        kafkaTemplate.executeInTransaction(kt -> kt.send(producerTopic, json));
    }
}

However in the product environment, I encounter a weird problem. The offset is increased by two per message sent by the producer and consumer doesn't commit the consuming offset.

Consumer Offsets from topic1 Consumer Offsets of topic1

Topic1 consumer detail

Topic1 consumer detail

Produce to topic2

Produce to topic2

However the count of messages sent by the producer is the same as the consumed. The downstream of the producer can receive the msgs from topic2 continuously. There's no error or exception found in the log.

I wonder why consume-transform-produce workflow seems ok(exactly-once scemantics also guaranteed), but the consumed offset isn't committed and the produced msg offset increment is two instead of 1 for per single msg.

How to fix it? Thx!

2

2 Answers

2
votes

That's the way it's designed. Kafka logs are immutable so an extra "slot" is used at the end of the transaction to indicate whether the transaction was committed or rolled back. This allows consumers with read_committed isolation level to skip over rolled-back transactions.

If you publish 10 records in a transaction, you will see the offset increase by 11. If you only publish one, it will increase by two.

if you want the publish to participate in the consumer-started transaction (for exactly-once), you should not be using executeInTransaction; that will start a new transaction.

/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

I don't see why the consumer offset would not be still sent to the consumer-started transaction though. You should turn on DEBUG logging to see what's happening (if it still happens after you fix the template code).

EDIT

The consumed offset (+1) is sent to the transaction by the listener container when the listener exits; turn on commit logging and you will see it...

@SpringBootApplication
public class So59152915Application {

    public static void main(String[] args) {
        SpringApplication.run(So59152915Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @KafkaListener(id = "foo", topics = "so59152915-1", clientIdPrefix = "so59152915")
    public void listen1(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
        System.out.println(in + "@" + offset);
        this.template.send("so59152915-2", in.toUpperCase());
        Thread.sleep(2000);
    }

    @KafkaListener(id = "bar", topics = "so59152915-2")
    public void listen2(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic("so59152915-1", 1, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("so59152915-2", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
        return args -> {
            this.template.executeInTransaction(t -> {
                IntStream.range(0, 11).forEach(i -> t.send("so59152915-1", "foo" + i));
                try {
                    System.out.println("Hit enter to commit sends");
                    System.in.read();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                return null;
            });
        };
    }

}

@Component
class Configurer {

    Configurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
        factory.getContainerProperties().setCommitLogLevel(Level.INFO);
    }

}

and

spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.consumer.auto-offset-reset=earliest

and

foo0@56
2019-12-04 10:07:18.551  INFO 55430 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=57, leaderEpoch=null, metadata=''}}
foo1@57
FOO0
2019-12-04 10:07:18.558  INFO 55430 --- [      bar-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-2-0=OffsetAndMetadata{offset=63, leaderEpoch=null, metadata=''}}
2019-12-04 10:07:20.562  INFO 55430 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Sending offsets to transaction: {so59152915-1-0=OffsetAndMetadata{offset=58, leaderEpoch=null, metadata=''}}
foo2@58
0
votes

Please pay attention for your auto commit setup. As I see you set it false:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

so, in this situation you need to commit "manually" or set the auto commit true.