I have used spring integration kafka for a simple scenario of read-process-write. the configs of the message driven endpoint bean is as follows:
<kafka:message-driven-channel-adapter listener-container="listenerContainer"
channel="processChannel"
mode="batch"/>
<bean id="listenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="test"/>
<property name="transactionManager" ref="kafkaTransactionManager"/>
<property name="eosMode" value="BETA"/>
</bean>
</constructor-arg>
</bean>
When a batch of records (e.g., 10 records) is polled by KafkaMessageListenerContainer a kafka transaction will be started, but all the 10 records will be integrated into single message by IntegrationBatchMessageListener
message = toMessagingMessage(records, acknowledgment, consumer);
Is there any solution which a batch to be processed in a single kafka transaction but each record to be processed individually in message driven end point and after that the transaction be committed?
I do not want to use manual acks if it is possible.
I have seen class BatchToRecordAdapter but do not think it is available in message driven end point.