1
votes

I have transactional and normal Producer in application which are writting to topic kafka-topic as below.

Configuration for transactional Kafka Producer

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the maximum amount of time the client will wait "
         "for the response of a request. If the response is not received before the timeout "
         "elapses the client will resend the request if necessary or fail the request if "
         "retries are exhausted.";.*/
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
        /*To avoid duplicate msg*/
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        /*Will wait for ack from broker n all replicas*/
        props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
        return props;
    }

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        return new KafkaProducer<>(producerConfigs());
    }

Normal Producer config are same only ProducerConfig.CLIENT_ID_CONFIG and ProducerConfig.TRANSACTIONAL_ID_CONFIG are not added.

Consumer config is as below

@Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();
        //list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //allows a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group");
        //automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //Auto commit is set false.Will do manual commit
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        /*Kafka Transactional Property ->Controls how to read messages written transactionally
         * read_committed - poll transactional messages which have been committed only
         * read_uncommitted - will return all messages, even transactional messages
         * default is read_uncommitted
         * */
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

As I am setting isolation.level as read_committed so It should consumer only transactional messages from subscribed topic. But is it consuming transactional and non-transactional messages from topic. Do I am missing any configuration so that consumer will only consume transactional messages from subscribed topic. Thanks in advance :-)

1

1 Answers

3
votes

It doesn't work that way. isolation.level only pertains to records committed by transactional producers. All consumers see records published by non-transactional producers.

You need to use two different topics to get the behavior you desire,.