0
votes

I have used spring integration kafka for a simple scenario of read-process-write. the configs of the message driven endpoint bean is as follows:

<kafka:message-driven-channel-adapter listener-container="listenerContainer"
                                      channel="processChannel"
                                      mode="batch"/>

<bean id="listenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.ContainerProperties">
                <constructor-arg name="topics" value="test"/>
                <property name="transactionManager" ref="kafkaTransactionManager"/>
                <property name="eosMode" value="BETA"/>
            </bean>
        </constructor-arg>
</bean>

When a batch of records (e.g., 10 records) is polled by KafkaMessageListenerContainer a kafka transaction will be started, but all the 10 records will be integrated into single message by IntegrationBatchMessageListener

message = toMessagingMessage(records, acknowledgment, consumer);

Is there any solution which a batch to be processed in a single kafka transaction but each record to be processed individually in message driven end point and after that the transaction be committed?

I do not want to use manual acks if it is possible.

I have seen class BatchToRecordAdapter but do not think it is available in message driven end point.

1

1 Answers

0
votes

No; but you can add a <splitter input-channel="processChannel" output-channel="..."/> to break the list into separate messages.