I'm using the following confing:
- spring-integration-kafka 2.1.0.RELEASE
- kafka-clients 0.10.0.1
- Kafka 0.10.x.x
- spring-xd-1.3.1.RELEASE
I created my custom Kafka-source module for SpringXD. I set my consumer logic and my message-driven-channel-adapter
(which I'm using in conjunction with a control-bus
to stop my channel adapter). So far so good. Also I'm using as a kafka property max.poll.record=10
to fetch 10 records per poll.
I would like to make sure that I'm stopping my channel right after all records (in this case 10 records) have been successfully fetched.
So for example: I would like to AVOID Stopping reading when not all records are been successfully fetched and processed (that is, when the records are not been sent to the output channel).
Is there a way to tell that?
This is my xml config, just in case:
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:channel id="input_to_control_bus" />
<int:channel id="output" />
<context:component-scan base-package="com.kafka.source.logic" />
<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />
<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
[UPDATE N°1] Why do I want to do this? These are the details:
- I would like to read at most X messages every Y minutes from a Kafka topic.
- I use
max.poll.records
to ensure that I'm fetching at most X messages per poll. - One scenario that I would like to handle is: what happens if in one specific polling of messages, I poll less messages than X. That implies that I should stop the channel without waiting for X messages, otherwise I would have to wait until a future poll of messages to reach those X messages.
Those are some details about this scenario. There are more scenarios but I don't want to mix it using the same SO question.
[UPDATE N°2]
Some thoughts after Artem's answer.
- What happens if I don't define a
max.poll.records
and just wait until having reached Y minutes and having counted X messages, and thenstop
the channel? - Do some messages would be lost because could not be read, or those message that couldn't be read will be read when I
start
again the channel?
I want to avoid to keep messages in memory, that is the reason I was using message-driven-channel-adapter
+ max.poll.records