We are using spring-integration-kafka version 3.1.2.RELEASE and int-kafka:message-driven-channel-adapter to consume messages from the remote kafka topic. The producer sends the encrypted message and we are decrypting the actual message using deserializer. We are able to consume all the messages posted in the topic. We have used the auto commit as false. We would like to know how to commit or acknowledge the message from our service after successfully processed the message. Can someone help us how to commit the messages read from message driven channel and provide some reference implementation ?
When we set the auto commit to true, we assume that it will commit the message after the commit interval but we would like to handle it in our service. I have come across the below example but we receive a custom object after deserialization rather spring integration message. so we would like to know how to implement the similar acknowledgement in the transformer so that we will not commit the message in case of any errors during the transformation. Commit the message after successful transformation.
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment != null) { System.out.println("Acknowledgment provided");
acknowledgment.acknowledge(); }
}
<int-kafka:message-driven-channel-adapter
id="kafkaMessageListener"
listener-container="kafkaMessageContainer" auto-startup="true"
phase="100" send-timeout="5000" mode="record"
message-converter="messageConverter"
recovery-callback="recoveryCallback" error-message-strategy="ems"
channel="inputFromKafkaChannel" error-channel="errorChannel" />
<int:transformer id="transformerid"
ref="transformerBean"
input-channel="inputFromKafkaChannel" method="transform"
output-channel="messageTransformer" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="${spring.kafka.consumer.group-id}" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="com.test.CustomDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${spring.kafka.topics}" />
</bean>
</constructor-arg>
</bean>