2
votes

I am trying to understand Kafka exactly once using Transactional producer/consumer.

I came across the below example. But, still I have hard time in understanding exactly once. Is this code correct?

producer.sendOffsetsToTransaction - What this code does? Should this be done to the same target topic?

What is system crashes before consumer.commitSync(); // The same messages will be read again and duplicate messages will be produced?

public class ExactlyOnceLowLevel {

    public void runConsumer() throws Exception {
        final KafkaConsumer<byte[], byte[]> consumer = createConsumer();
        final Producer<Long, String> producer = createProducer();

        producer.initTransactions();

        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));

            try {
                final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
                producer.beginTransaction();
                for (final ConsumerRecord<byte[], byte[]> record : records) {
                    System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\n", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value());

                    final ProducerRecord<Long, String> producerRecord =
                                new ProducerRecord<>(TOPIC_1, new BigInteger(record.key()).longValue(), record.value().toString());
                    // send returns Future
                    final RecordMetadata metadata = producer.send(producerRecord).get();
                    currentOffsets.put(new TopicPartition(TOPIC_1, record.partition()), new OffsetAndMetadata(record.offset()));
                }
                producer.sendOffsetsToTransaction(currentOffsets, "my-transactional-consumer-group"); // a bit annoying here to reference group id rwice
                producer.commitTransaction();
                consumer.commitSync();
                currentOffsets.clear();
                // EXACTLY ONCE!
            }
            catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                e.printStackTrace();
                // We can't recover from these exceptions, so our only option is to close the producer and exit.
                producer.close();
            }
            catch (final KafkaException e) {
                e.printStackTrace();
                // For all other exceptions, just abort the transaction and try again.
                producer.abortTransaction();
            }
            finally {
                producer.flush();
                producer.close();
            }
        }
    }

    private static KafkaConsumer<byte[], byte[]> createConsumer() {
        final Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED); // this has to be

        return new KafkaConsumer<>(consumerConfig);
    }

    private static Producer<Long, String> createProducer() {
        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // this is now safe !!!!
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // this has to be all
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // this has to be 1

        return new KafkaProducer<>(props);
    }

    public static void main(final String... args) throws Exception {

        final ExactlyOnceLowLevel example = new ExactlyOnceLowLevel();
        example.runConsumer();

    }
}
1

1 Answers

1
votes

You should not attempt to commit offsets with the Consumer when using the read/process/write pattern with Kafka Transactions. As you hinted this can cause issues.

In this use case, offsets need to be added to the transaction and you should only use sendOffsetsToTransaction() to do that. That method ensure that these offsets are only committed if the full transaction succeeds. See the Javadoc:

Sends a list of specified offsets to the consumer group coordinator, and also marks those offsets as part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. The committed offset should be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.

This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified consumerGroupId should be the same as config parameter group.id of the used consumer. Note, that the consumer should have enable.auto.commit=false and should also not commit offsets manually (via sync or async commits).