I implemented spring-integration-kafka 1.0.0M into a Spring MVC project last year using the XML configuration and it was very simple to do. Since Spring seems to be moving in a Java configuration direction (rather than XML), I would like to convert from using the spring-integration-kafka XML configuration to the Java configuration, which the latest version of spring-integration-kafka (1.2.1) supports. The problem is that there really aren't many complete examples of this being done online, and the examples I have found look to be out-of-date from what I can tell. The configuration I have is pretty simple:
<bean id="kafkaStringEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder" />
<bean id="customObjectMapper" class="ad.content.api.utils.ObjectMapperFactory" factory-method="getMapper" />
<int:channel id="kafkaConversionRequest" />
<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="message.send.max.retries">${kafka.retries}</prop>
</props>
</property>
</bean>
<int-kafka:producer-context id="kafkaWidgetProducerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="${kafka.broker}" key-class-type="java.lang.String"
key-encoder="kafkaStringEncoder" value-class-type="java.lang.String"
value-encoder="kafkaStringEncoder" topic="widget-.*"
compression-codec="default" async="true" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<!-- declare spring integration gateway for kafka -->
<int:gateway service-interface="ad.content.api.models.kafka.KafkaGateway" default-reply-timeout="2000">
<int:method name="publishConversion" request-channel="kafkaConversionRequest" />
</int:gateway>
<int:chain input-channel="kafkaConversionRequest" output-channel="kafkaToJson">
<int:header-enricher>
<int:header name="topic" value="widget-conversion" />
</int:header-enricher>
</int:chain>
<int:object-to-json-transformer input-channel="kafkaToJson" output-channel="kafkaOutbound" object-mapper="customObjectMapper" />
<int-kafka:outbound-channel-adapter id="kafkaOutbound" kafka-producer-context-ref="kafkaWidgetProducerContext" />
Here's what I can figure out so far:
// gateway
@MessagingGateway(defaultReplyTimeout="2000")
public interface KafkaGateway {
@Gateway(requestChannel="kafkaConversionRequest", headers=@GatewayHeader(name="topic", value="widget-conversion"))
void publishConversion(Conversion conversion);
}
// create channel
@Bean(name="kafkaConversionRequest")
public MessageChannel getConversionRequest() {
return new DirectChannel();
}
@Bean
public KafkaProducerMessageHandler getHandler() throws Exception {
return new KafkaProducerMessageHandler(getContext());
}
@Bean
public KafkaProducerContext getContext() throws Exception {
KafkaProducerContext context = new KafkaProducerContext();
context.setProducerConfigurations(Collections.singletonMap("config", getConfiguration()));
return context;
}
@Bean
public ProducerConfiguration<String, String> getConfiguration() throws Exception {
return new ProducerConfiguration<String, String>(getMetaData(), getProducer());
}
@Bean
@Transformer(inputChannel="kafkaToJson", outputChannel="kafkaOutbound")
public ObjectToJsonTransformer getJsonTransformer() {
return new ObjectToJsonTransformer();
}
@Bean
public ProducerMetadata<String, String> getMetaData() {
StringSerializer serializer = new StringSerializer();
return new ProducerMetadata<String, String>("widget-.*", String.class, String.class, serializer, serializer);
}
@Bean
public Producer<String, String> getProducer() throws Exception {
return new ProducerFactoryBean<String, String>(getMetaData(), "dev.kafka-broker01:9092").getObject();
}