2
votes

Using spring-integration 5.0.7 to throttle the bridging of msgs between two JMS queues.

The docs at: https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/messaging-channels-section.html#bridge-namespace

suggest:

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

But schema validator complains "no nested poller allowed for subscribable input channel" on bridge elt.

But, if I put the poller on the input-channel-adapter as in:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
 ">
    <int:channel id="inChannel" />
    <int:channel id="outChannel" />

    <int-jms:inbound-channel-adapter id="jmsIn" connection-factory="jmsConnectionFactory" destination-name="_dev.inQueue" channel="inChannel">
       <int:poller fixed-delay="5000" max-messages-per-poll="2"/>
    </int-jms:inbound-channel-adapter>

    <int-jms:outbound-channel-adapter id="jmsOut" connection-factory="jmsConnectionFactory" destination-name="_dev.outQueue" channel="outChannel"/>
    <int:bridge input-channel="inChannel" output-channel="outChannel">
    </int:bridge>
</beans:beans>

Nothing is ever moved from input to output.

How can I bridge from one JMS queue to another with a rate-limit?

Update:

Turning on logging confirms nothing getting picked up from input channel but otherwise not helpful:

018-08-10 15:36:33.345 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:38.113 DEBUG 112066 --- [ask-scheduler-2] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:2:1,started=true} java.lang.Object@5c278302
2018-08-10 15:36:38.116 DEBUG 112066 --- [ask-scheduler-2] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:43.115 DEBUG 112066 --- [ask-scheduler-1] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:3:1,started=true} java.lang.Object@1c09a81e
2018-08-10 15:36:43.118 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
1
You don't need a bridge, you can just use one channel. That said, it should work OK. Turn on DEBUG logging to see if it gives you any clues.Gary Russell
I guess if I wanted to handle the msg in spring-integration, yes, I wouldnt need to bridge to another queue...but for now I want to keep the current consumer.mmeyer
In the code above, you are simply bridging in the inbound adapter's channel to the outbound adapter's channel. It's unnecessary; both adapters can use the same channel; the inbound adapter's poller will send the message directly to the outbound adapter.Gary Russell
Ok, tried using one channel. Made no difference. Are we sure that polling is supported on inbound-channel-adapter using a JmsConnectionFactory? If I use a straight message-driven-channel-adapter it works fine, but of course no throttling.mmeyer
I said it wouldn't make a difference; the bridge is just not needed. As I also said, DEBUG logging will show you messaging activity.Gary Russell

1 Answers

1
votes

Here is a Spring Boot app, using Java DSL configuration which is the exact equivalent of what you have in XML (minus the bridge); it works fine.

@SpringBootApplication
public class So51792909Application {

    private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So51792909Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            for (int i = 0; i < 10; i++) {
                template.convertAndSend("foo", "test");
            }
        };
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                        .destination("foo"), e -> e
                            .poller(Pollers
                                    .fixedDelay(5000)
                                    .maxMessagesPerPoll(2)))
                .handle(Jms.outboundAdapter(connectionFactory)
                        .destination("bar"))
                .get();
    }

    @JmsListener(destination = "bar")
    public void listen(String in) {
        logger.info(in);
    }

}

and

2018-08-10 19:38:52.534  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:52.543  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.566  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.582  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.608  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.622  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.640  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.653  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.672  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.687  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test

As you can see, the consumer gets 2 messages every 5 seconds.

Your debug log implies there are no messages in the queue.

EDIT

I figured it out; the XML parser sets the JmsTemplate receiveTimeout to nowait (-1). Since you are not using a caching connection factory, we'll never get a message because the ActiveMQ client returns immediately if there's not already a message present in the client (see this answer). Since there's no caching going on, we get a new consumer on every poll (and do a no-wait receive each time).

The DSL leaves the JmsTemplate's default (Infinite wait - which is actually wrong since it blocks the poller thread indefinitely if there are no messages).

To fix the XML version, adding receive-timeout="1000" fixes it.

However, it's better to use a CachingConnectionFactory to avoid creating a new connection/session/consumer on each poll.

Unfortunately, configurating a CachingConnectionFactory turns off Spring Boot's auto-configuration. This is fixed in Boot 2.1.

I have opened an issue to resolve the inconsistency between the DSL and XML here.

If you stick with the DSL, I would recommend setting the receive timeout to something reasonable, rather than indefinite:

@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                    .configureJmsTemplate(t -> t.receiveTimeout(1000))
                    .destination("foo"), e -> e
                        .poller(Pollers
                                .fixedDelay(5000)
                                .maxMessagesPerPoll(2)))
            .handle(Jms.outboundAdapter(connectionFactory)
                    .destination("bar"))
            .get();
}

But, the best solution is to use a CachingConnectionFactory.