0
votes

I am trying to convert the following spring integration project to a java config version using the Spring Integration dsl. I'm not having much luck and I can't find documentation on the dsl that helps me understand the framework enough to get through this.

Here's the project I'm converting. It uses xml config. https://github.com/dsyer/http-amqp-tunnel

Basically it takes an http request then tunnels it through rabbitmq to a target app on the other side. A good description of what the project should do can be found on the link above.

The main differences between my app and the one on github I listed above are that mine is based on spring boot 1.5.1.RELEASE and the original is on 1.1.4.BUILD-SNAPSHOT. Also, the original project uses the spring integration xml namespace support namely int-http:inbound-gateway, int-http:outbound-gateway, int-amqp:outbound-gateway and int-amqp:inbound-gateway whereas I'm using the IntegrationFlow dsl in a java config.

My code never even puts a message on RabbitMQ and I get a timeout exception in the browser so I think my IntegrationFlow setup is incorrect. I have added a wire tap that logs the requests and I only see the output of one wire tap when I hit the app from a browser.

Just a nudge in the right direction would be greatly appreciated.

Updated config and error

package org.springframework.platform.proxy;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.*;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.http.Http;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;
import org.springframework.web.client.RestTemplate;

@Configuration
@ComponentScan
@EnableAutoConfiguration
@EnableIntegration
public class TunnelApplication 
{
    public static void main(String[] args) 
    {
        SpringApplication.run(TunnelApplication.class, args);
    }

    @Value("${urlExpression}")
    private String urlExpression;

    @Value("${targetUrl}")
    private String targetUrl;

    @Value("${outboundQueue}")
    private String outboundQueue;

    @Value("${inboundQueue}")
    private String inboundQueue;

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Bean
    public Queue requestQueue() 
    {
        return new Queue(outboundQueue, true, false, true);
    }

    @Bean
    public Queue targetQueue() 
    {
        return new Queue(inboundQueue, true, false, true);
    }

    @Bean
    public RestTemplate safeRestTemplate()
    {
        return new RestTemplate();
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }


    @Bean
    public AmqpTemplate amqpTemplate()
    {
        RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
        result.setMessageConverter(jsonMessageConverter());
        return result;
    }

    @Bean
    public IntegrationFlow httpInboundGateway()
    {
        return IntegrationFlows
                .from(Http.inboundGateway("/tunnel"))
                .handle(
                        Amqp.outboundAdapter(amqpTemplate())
                            .mappedRequestHeaders("http_*")
                            .routingKey(outboundQueue)
//                          .routingKeyExpression("headers['routingKey']")
                        )
                .wireTap(f->f.handle(logger("outbound")))
                .get();
    }

    @Bean
    public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) 
    {
        return IntegrationFlows.from
                (
                    Amqp.inboundGateway(connectionFactory, inboundQueue)
                        .mappedRequestHeaders("http_*")
                        .messageConverter(jsonMessageConverter())
                )
                .handle(Http.outboundGateway(targetUrl))
                .wireTap(f->f.handle(logger("inbound")))
                .get();
    }


    @Bean
    public MessageHandler logger(String name) 
    {
         LoggingHandler loggingHandler =  new LoggingHandler(LoggingHandler.Level.INFO.name());
         loggingHandler.setLoggerName(name);
         return loggingHandler;
    }
}

The following error message continues to get printed and there is a message that stays on RabbitMQ while the app is running. It looks like it is pulling it off and getting an error and then putting it back on. That concerns me because I want any errors to propagate back to the originating client and not bog down the server.

2017-02-06 16:00:12.167  INFO 10264 --- [nio-9000-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-02-06 16:00:12.167  INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization started
2017-02-06 16:00:12.190  INFO 10264 --- [nio-9000-exec-1] o.s.web.servlet.DispatcherServlet        : FrameworkServlet 'dispatcherServlet': initialization completed in 23 ms
2017-02-06 16:00:16.806  INFO 10264 --- [erContainer#0-1] outbound                                 : <200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}>
2017-02-06 16:00:16.810  WARN 10264 --- [erContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_66]
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test:9000.amqpInboundGateway.channel#1'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=<200 OK,{X-Application-Context=[application], Content-Type=[text/html;charset=UTF-8], Content-Length=[14], Date=[Mon, 06 Feb 2017 22:00:16 GMT]}>, headers={http_requestMethod=GET, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2eb9b1c6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@2eb9b1c6, amqp_consumerQueue=request, http_requestUrl=http://localhost:9000/tunnel/, id=bcb94ed9-45fc-c333-afee-de6e20a9f1b5, Content-Length=14, amqp_consumerTag=amq.ctag-ncEDSKdgWNKQk-jhGfqsbw, contentType=text/html;charset=UTF-8, http_statusCode=200, Date=1486418416000, timestamp=1486418416805}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) ~[spring-messaging-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:441) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:409) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$400(AmqpInboundGateway.java:52) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.amqp.inbound.AmqpInboundGateway$1.onMessage(AmqpInboundGateway.java:154) ~[spring-integration-amqp-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.7.RELEASE.jar:4.3.7.RELEASE]
    ... 35 common frames omitted

Config based on Gary's comment Modified to hit the /beans endpoint of a spring boot actuator.

@Bean
public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
    return IntegrationFlows.from(Http.inboundGateway("/tunnel"))
            .log()
            .handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName()))
            .log()
            .bridge(null)
            .get();
}

@Bean
public Queue queue() {
    return new AnonymousQueue();
}

@Bean
public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
            .log()
            .handle(Http.outboundGateway("http://localhost:8080/beans")
                    .expectedResponseType(String.class))
            .log()
            .bridge(null)
            .get();
}

@Bean
public IntegrationFlow finalWeb() {
    return IntegrationFlows.from(Http.inboundGateway("/beans"))
            .log()
            .<String, String>transform(String::toUpperCase)
            .log()
            .bridge(null)
            .get();
}
2

2 Answers

1
votes

For request/reply interaction you have to use Amqp.outboundGateway. That is what Dave have in its sample:

<int-amqp:outbound-gateway request-channel="outbound"
      routing-key="${outboundQueue}" mapped-request-headers="http_*" />

Plus, look, you have missed here a routingKey, which according to Dave's logic must be outboundQueue.

The Http.inboundGateway and that Amqp.outboundGateway can be combined to one IntegrationFlow:

@Bean
public IntegrationFlow clientGateway() {
    return IntegrationFlows
            .from(Http.inboundGateway("/tunnel"))
            .handle(Amqp.outboundGateway(amqpTemplate)
                        .mappedRequestHeaders("http_*")
                        .routingKey(outboundQueue))
            .get();
}

The server part can be combined to a single IntegrationFlow as well. And its components looks good for me.

You really expect a reply from your rest service, so, all your downstream components must be request/reply.

Just let's take a look to the design one more time!

-------------   HTTP   -------------   AMQP    -------------   AMQP   -------------   HTTP   --------------
| local app | <------> |   client  | <------>  |   broker  | <------> |   server  | <------> | target app | 
-------------          -------------           -------------          -------------          --------------
0
votes
@SpringBootApplication
public class So42077149Application {

    public static void main(String[] args) {
        SpringApplication.run(So42077149Application.class, args);
    }

    @Bean
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
        return IntegrationFlows.from(Http.inboundGateway("/foo"))
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName()))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
                .log()
                .handle(Http.outboundGateway("http://localhost:8080/bar")
                        .expectedResponseType(String.class))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow finalWeb() {
        return IntegrationFlows.from(Http.inboundGateway("/bar"))
                .log()
                .<String, String>transform(String::toUpperCase)
                .log()
                .bridge(null)
                .get();
    }


}

Result:

$ curl -H "Content-Type: text/plain" -d foo localhost:8080/foo
FOO

EDIT

And with JSON...

otApplication public class So42077149Application {

    public static void main(String[] args) {
        SpringApplication.run(So42077149Application.class, args);
    }

    @Bean
    public IntegrationFlow webToRabbit(RabbitTemplate amqpTemplate) {
        return IntegrationFlows.from(Http.inboundGateway("/foo"))
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .routingKey(queue().getName())
                        .mappedRequestHeaders("*")
                        .mappedReplyHeaders("*"))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue()))
                .log()
                .handle(Http.outboundGateway("http://localhost:8080/bar")
                        .mappedRequestHeaders("*")
                        .mappedResponseHeaders("*")
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(Map.class))
                .log()
                .log(Level.INFO, "payloadClass", "payload.getClass()")
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow finalWeb() {
        return IntegrationFlows.from(Http.inboundGateway("/bar"))
                .log()
                .transform("{ \"foo\" : \"bar\" }")
                .enrichHeaders(h -> h.header("contentType", "application/json"))
                .log()
                .bridge(null)
                .get();
    }

}