I encountered the following situation in a Spring Boot application with JMS over ActiveMQ 5 as broker.
A @JMSListener annotated method processes a message and sends a response message to a different destination. There is also a @JMSListener for this destination, which is not called when the response has been sent to the broker, but only when the processing of the original listner is completely finished. If this listener is additionally annotated with @Async, the reponse is received immediately after sending as expected.
The original project is way too big, so I prepared the minimal example below. It contains a Spring Boot application TestApp with a single @JmsListener (1) which immediately forwards a message from Destination in to out and afterwards sleeps for 3 seconds.
The application is started in a test which sends a single message to in and waits for 2 seconds for the response on out.
Only if the @Async is present at (1) the test is successful.
Further observations:
- Same behaviour if the test uses variant (2) and receives the response via JmsTemplate instead using a JMSListener.
- In any case one can see that the message is present in the broker immediately after sending.
Question: Why is receiving self-sent messages blocked in this situation? How can the outgoing message be received immediately without using @Async?
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);
@Container
public static final GenericContainer<?> ACTIVEMQ =
new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
.withExposedPorts(8161, 61616)
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
.withStartupTimeout(Duration.ofSeconds(60))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@DynamicPropertySource
private static void ports(DynamicPropertyRegistry registry)
{
registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
}
@Autowired
private JmsTemplate jmsTemplate;
private List<String> messages = new LinkedList<>();
@Async
@JmsListener(destination = "out")
public void onOut(String message)
{
LOG.warn("Received message from out: {}", message);
messages.add(message);
}
@Test
public void foo() throws InterruptedException
{
LOG.warn("Sending request");
// Sending some message on destination 'in' to be received and answered by the listener below
jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());
LOG.warn("Waiting for repsonse");
// (2) // Try to receive response from 'out'
// jmsTemplate.setReceiveTimeout(2_000);
// Message response = jmsTemplate.receive("out");
// assertThat(response, notNullValue());
Thread.sleep(2_000);
assertThat(messages, hasSize(1));
}
}
@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);
public static void main(String[] args)
{
SpringApplication.run(TestApp.class, args);
}
@Autowired
private JmsTemplate jmsTemplate;
// (1)
// @Async
@JmsListener(destination = "in")
public void onIn(String message) throws InterruptedException
{
LOG.warn("Received message from in: {}", message);
jmsTemplate.convertAndSend("out", message);
LOG.warn("Sent Response");
LOG.warn("Sleeping ...");
Thread.sleep(3_000);
LOG.warn("Finished");
}
}
Here the pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>foo</groupId>
<artifactId>jmstest</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
convertAndSend
does not commit the message, so the listener can not see it? – Daniel Steinmann