0
votes

We are using spring-integration-kafka version 3.1.2.RELEASE and int-kafka:message-driven-channel-adapter to consume messages from the remote kafka topic. The producer sends the encrypted message and we are decrypting the actual message using deserializer. We are able to consume all the messages posted in the topic. We have used the auto commit as false. We would like to know how to commit or acknowledge the message from our service after successfully processed the message. Can someone help us how to commit the messages read from message driven channel and provide some reference implementation ?

When we set the auto commit to true, we assume that it will commit the message after the commit interval but we would like to handle it in our service. I have come across the below example but we receive a custom object after deserialization rather spring integration message. so we would like to know how to implement the similar acknowledgement in the transformer so that we will not commit the message in case of any errors during the transformation. Commit the message after successful transformation.

  Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
  if(acknowledgment != null) { System.out.println("Acknowledgment provided");
  acknowledgment.acknowledge(); }
   }



<int-kafka:message-driven-channel-adapter
    id="kafkaMessageListener"
    listener-container="kafkaMessageContainer" auto-startup="true"
    phase="100" send-timeout="5000" mode="record"
    message-converter="messageConverter"
    recovery-callback="recoveryCallback" error-message-strategy="ems"
    channel="inputFromKafkaChannel" error-channel="errorChannel" />

<int:transformer id="transformerid"
    ref="transformerBean"
    input-channel="inputFromKafkaChannel" method="transform"
    output-channel="messageTransformer" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="group.id" value="${spring.kafka.consumer.group-id}" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="com.test.CustomDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="${spring.kafka.topics}" />
        </bean>
    </constructor-arg>
</bean>
1

1 Answers

1
votes

auto.commit.offset=true means the kafka-clients library commits the offsets.

When false (preferred with Spring for Apache Kafka), the listener container commits the offsets, after each batch received by the poll() by default, but the mechanism is controlled by the container's AckMode property.

See Committing Offsets.

If you set the container's AckMode to MANUAL or MANUAL_IMMEDIATE then your application must perform the commits, using the Acknowledgment object.

When using Spring Integration, the Acknowledgment object is available in the KafkaHeaders.ACKNOWLEDGMENT header.

In most cases, AckMode.BATCH (default) or AckMode.RECORD should be used and your application doesn't need to be concerned about committing offsets.