2
votes

I'm using spring-integration-kafka 1.1.0 with the following config. I don't quite understand about the streams config. When I increase this, does Spring automatically spawn more threads to handle the messages? e.g. when I have streams=2, does the correlated transformer and service-activator all run in 2 threads? I feel like missing some thread-executor configurations, but not sure how. Any hint is appreciated. Thanks.

<int:poller default="true" fixed-delay="10"/>

<int:channel id="tag.track">
</int:channel>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapterForTagTrack" kafka-consumer-context-ref="consumerContextForTagTrack" auto-startup="true" channel="tag.track">
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContextForTagTrack"
	consumer-timeout="${kafka.consumer.timeout}" zookeeper-connect="zookeeperConnect">
	<int-kafka:consumer-configurations>
		<int-kafka:consumer-configuration group-id="${kafka.consumer.group.track}" max-messages="200">
			<int-kafka:topic id="tag.track" streams="2" />
		</int-kafka:consumer-configuration>
	</int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int:channel id="tag.track.transformed">
	<int:interceptors>
		<int:wire-tap channel="event.logging" />
	</int:interceptors>
</int:channel>

<int:transformer id="kafkaMessageTransformerForTagTrack"
	ref="kafkaMessageTransformer" input-channel="tag.track" method="transform"
	output-channel="tag.track.transformed" />

<int:service-activator input-channel="tag.track.transformed" ref="tagTrackMessageHandler" method="handleTagMessage">
	<int:request-handler-advice-chain>
		<ref bean="userTagRetryAdvice" />
	</int:request-handler-advice-chain>
</int:service-activator>

Tried message-driven-channel-adapter, but can't get it work, the following config doesn't pick up any message. Also tried the org.springframework.integration.kafka.listener.KafkaTopicOffsetManager , it complains Offset management topic cannot have more than one partition. Also, in this adapter, how to configure the consumer group? Is there any detailed example on how to use the message-driven-channel-adapter? The instruction on the project page is pretty high level.

<int:channel id="tag.track">
	<int:queue capacity="100"/>
</int:channel>

<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
	<constructor-arg ref="zookeeperConnect"/>
</bean>

<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
	<constructor-arg ref="kafkaConfiguration"/>
</bean>

<bean id="decoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>

<int-kafka:message-driven-channel-adapter
		id="adapter"
		channel="tag.track"
		connection-factory="connectionFactory"
		key-decoder="decoder"
		payload-decoder="decoder"
		max-fetch="100"
		topics="tag.track"
		auto-startup="true"
		/>
1

1 Answers

0
votes

The streams property has nothing to do with Spring itself; it's simply passed to Kafka when invoking ConsumerConnector.createMessageStreams() (each topic/streams entry is passed in the map argument).

Refer to the kafka documentation.

EDIT:

When using the high-level consumer, the kafka inbound channel adapter is polled, so the threads on which the downstream integration flow runs are not related to the kafka client threads; they are managed in the poller configuration.

You could consider using the message-driven channel adapter instead.