1
votes

I am trying to create a integration flow for JMS MessageDriverChannelAdapter through which I need to send message to the Kafka server. But I really stuck when I am trying to convert the the xml tag to dsl specific code, not able to convert the xml to required DSL. Can any one please provide any pointer to it as I am not able to proceed over here.

I have created a MessageListenerContainer like this........

String brokerUrl = "tcp://101.11.102.125:31316";
String topic = "sometpoic"; 
String kafkaBrokerUrl = "101.11.102.125:1012";
String kafkaTopic = "kafka_Topic";

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory();
    ActiveMQTopic mqTopic = new ActiveMQTopic(topic);           
    conFactory.setBrokerURL(brokerUrl);
    container.setConnectionFactory(conFactory);
    container.setDestination(mqTopic);
    container.setSessionTransacted(true);
    return container;
}

These are my input and output channels........

@Bean
public MessageChannel jmsInChannel() {
     return MessageChannels.publishSubscribe().get();
}

@Bean
public MessageChannel jmsOutChannel() {
     return MessageChannels.publishSubscribe().get();
}

And this is my JMS adapter flow............

@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())                                              
                   .get();
}

Now I need to create a header-enricher like this but not able to covert this into DSL.

<int:header-enricher input-channel="jmsInChannel" output-channel="jmsOutChannel">
<int:header name="kafkaBrokerUrl" value="${kafka.url}"></int:header>
<int:header name="kafkaTopic" value="${kafka.topic}"></int:header>

and I need to create a service-activator and call a kafka producer method form a different class like this in xml....

<int:service-activator input-channel="jmsOutChannel" ref="KafkaProducer" method="produceToJmsKafka"/>
<bean id="KafkaProducer" class="com.david.jms.JmsKafkaProducer"/>

So how to convert these above xml code to similar DSL specific code.

  After getting the compilation error I have tried like this...
 @SuppressWarnings("unchecked")
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())
                   .enrichHeaders(new MapBuilder()
                                .put("brokerid", brokerid)
                                .put("topic", topic)
                                .put("source", source)
                                .put("fileType", fileType))
                   .handle("KafkaProducer", "produceToJmsKafka")
                   .get();
}

@Bean
public JmsProducer KafkaProducer() {
    return new JmsProducer();
}   
1

1 Answers

0
votes

That may be like this:

@Value("${kafka.url}")
private String kafkaBrokerUrl;

@Value("${kafka.topic}")
private String kafkaTopic;

....
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel()) 
                   .enrichHeaders(new StringStringMapBuilder()
                                            .put("kafkaBrokerUrl", kafkaBrokerUrl)
                                            .put("kafkaTopic", kafkaTopic))
                   .handle("KafkaProducer", "produceToJmsKafka")
                   .get();
}

From here I don't see reason to have those MessageChannel beans, especially like publishSubscribe().

From other side since DSL 1.1 we provide the implementation for Spring Integration Kafka Adapters.