I'm using spring-integration-kafka 1.1.0 with the following config. I don't quite understand about the streams
config. When I increase this, does Spring automatically spawn more threads to handle the messages? e.g. when I have streams=2
, does the correlated transformer and service-activator all run in 2 threads? I feel like missing some thread-executor configurations, but not sure how. Any hint is appreciated. Thanks.
<int:poller default="true" fixed-delay="10"/>
<int:channel id="tag.track">
</int:channel>
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapterForTagTrack" kafka-consumer-context-ref="consumerContextForTagTrack" auto-startup="true" channel="tag.track">
</int-kafka:inbound-channel-adapter>
<int-kafka:consumer-context id="consumerContextForTagTrack"
consumer-timeout="${kafka.consumer.timeout}" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="${kafka.consumer.group.track}" max-messages="200">
<int-kafka:topic id="tag.track" streams="2" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int:channel id="tag.track.transformed">
<int:interceptors>
<int:wire-tap channel="event.logging" />
</int:interceptors>
</int:channel>
<int:transformer id="kafkaMessageTransformerForTagTrack"
ref="kafkaMessageTransformer" input-channel="tag.track" method="transform"
output-channel="tag.track.transformed" />
<int:service-activator input-channel="tag.track.transformed" ref="tagTrackMessageHandler" method="handleTagMessage">
<int:request-handler-advice-chain>
<ref bean="userTagRetryAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
Tried message-driven-channel-adapter, but can't get it work, the following config doesn't pick up any message. Also tried the org.springframework.integration.kafka.listener.KafkaTopicOffsetManager
, it complains Offset management topic cannot have more than one partition
. Also, in this adapter, how to configure the consumer group?
Is there any detailed example on how to use the message-driven-channel-adapter? The instruction on the project page is pretty high level.
<int:channel id="tag.track">
<int:queue capacity="100"/>
</int:channel>
<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect"/>
</bean>
<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
<constructor-arg ref="kafkaConfiguration"/>
</bean>
<bean id="decoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>
<int-kafka:message-driven-channel-adapter
id="adapter"
channel="tag.track"
connection-factory="connectionFactory"
key-decoder="decoder"
payload-decoder="decoder"
max-fetch="100"
topics="tag.track"
auto-startup="true"
/>