0
votes

I am using Spring JMS, and I would like to turn off the retrieval of messages from the queue while still allowing for publishing of new messages on that same (or other) queues from within those listeners.

I have a @JmsListener method which will pull messages off of the queue, and process them. This Listener, through various stages, will publish new messages using JmsTemplate.convertAndSend()

@JmsListener(destination = "queue-name")
public void processJob(Object job) {
    jmsTemplate.convertAndSend("begin");
    // Do some stuff
    jmsTemplate.convertAndSend("almost-done");
    // More stuff!
    jmsTemplate.convertAndSend("done");
}

I have a very simple listener factory as well:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory myFactory, MessageConverter jacksonJmsMessageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(myFactory);
    factory.setMessageConverter(jacksonJmsMessageConverter);
    return factory;
}

Now, what I want is to STOP the listener from pulling messages off of the queue, while still allowing for the convertAndSend calls within the method to continue successfully.

The only thing I have seen, to stop a queue, is invoking jmsListenerEndpointRegistry.stop(); on the JmsListenerEndpointRegistry bean. This will stop the queue properly, and no messages will be pulled. However, it seems that stopping the JmsListenerEndpointRegistry will also result in messages that I try to publish with JmsTemplate.convertAndSend to fail to publish.

When I try to convertAndSend a message AFTER calling jmsListenerEndpointRegistry.stop(); I get the following error:

o.a.activemq.ActiveMQSessionExecutor     : Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()?

This error makes sense, I suppose. Stopping the endpoint registry not only shuts down the functionality for listeners, but also the functionality for publishing messages.

So my question is this: Given the above scenario with the listener, how can I shut down the Listener so that no new messages are processed, while still allowing for the JmsTemplate.convertAndSend messages to be successfully published?

1
Show your template configuration.Gary Russell
I don't have any specific configuration for the template. Whatever Spring JMS is doing by default - that's what I'm working with. Currently I'm just using the in-memory embedded JMS spring.activemq.in-memory=truePatrick D
Spring JMS doesn't configure a template by default, but Spring Boot does; it looks like you are using that. Stopping the container should not affect the template, given your configuration. I'll play around with it to see if I can reproduce.Gary Russell
You're correct; I am using Spring Boot.Patrick D

1 Answers

1
votes

There is something unusual about your application/environment. This works fine...

@SpringBootApplication
public class So59863889Application {

    private static final Logger LOG = LoggerFactory.getLogger(So59863889Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So59863889Application.class, args);
    }

    @Autowired
    JmsTemplate template;

    @JmsListener(id = "foo", destination = "foo")
    public void listen1(String in) throws InterruptedException {
        LOG.info(in);
        Thread.sleep(5_000);
        this.template.convertAndSend("bar", in.toUpperCase());
    }

    @JmsListener(id = "bar", destination = "bar")
    public void listen2(String in) throws InterruptedException {
        LOG.info(in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template, JmsListenerEndpointRegistry registry) {
        return args -> {
            IntStream.range(0, 20).forEach(i -> template.convertAndSend("foo", "test" + i));
            System.in.read();
            LOG.info("Stopping foo");
            registry.getListenerContainer("foo").stop();
        };
    }

}

The send after the sleep() (after stopping the container works fine).

I get the same result when I override Boot's container factory, but explicitly referencing ActiveMQConnectionFactory fails for me (no such bean); I have to change it to simply ConnectionFactory.

If you can provide a simple app (like mine) that reproduces the issue, I can look further.