I'm struggling with async RabbitMQ message with callback message.
Here is part of my producer:
@Autowired(required = false)
@Qualifier("rabbitTemplate")
private RabbitTemplate queueTemplate;
@Override
public Response createIncident(String incident) {
LOGGER.info("Sending incident into queue");
queueTemplate.convertAndSend((Object)incident, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo("replyQueue");
return message;
}
});
Message message = queueTemplate.receive();
...
Spring rabbitMQ context
<rabbit:admin connection-factory="rabbitConnFactory" />
<rabbit:queue name="rabbitQueue" />
<rabbit:queue name="replyQueue" />
<rabbit:topic-exchange name="rabbitExchange">
<rabbit:bindings>
<rabbit:binding queue="rabbitQueue" pattern="camel" />
<rabbit:binding queue="replyQueue" pattern="camel" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:template id="rabbitTemplate"
connection-factory="rabbitConnFactory" exchange="rabbitExchange" routing-key="camel"
reply-address="replyQueue" reply-queue="replyQueue">
<rabbit:reply-listener/>
</rabbit:template>
<rabbit:listener-container
connection-factory="rabbitConnFactory" concurrency="10">
<rabbit:listener ref="rabbitMessageListener" method="createIncident"
queue-names="rabbitQueue" />
</rabbit:listener-container>
<bean id="rabbitMessageListener" class="com.my.listener.QueueListener" />
And Listener that should receive and respond
@Autowired(required = false)
@Qualifier("rabbitTemplate")
private RabbitTemplate queueTemplate;
public void createIncident() {
queueTemplate.receiveAndReply("replyQueue", new ReceiveAndReplyMessageCallback() {
@Override
public Message handle(Message message) {
String incident = new String(message.getBody());
LOGGER.debug("Queue incoming message: " + incident);
String result = "" + messageHandler.createIncident(incident);
return new Message(result.getBytes(), new MessageProperties());
}
});
It do not want to get inside this handle message in listener. And application container display message
2016-06-22 14:15:30,457 ERROR [AbstractFaultChainInitiatorObserver:115] Error occurred during error handling, give up! org.apache.cxf.interceptor.Fault: No 'queue' specified. Check configuration of RabbitTemplate. at org.apache.cxf.service.invoker.AbstractInvoker.createFault(AbstractInvoker.java:170) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:136) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:204) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:101) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:58) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:94) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:272) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:249) at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:248) at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:222) at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:153) at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:171) at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:289) at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:209) at javax.servlet.http.HttpServlet.service(HttpServlet.java:650) at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:265) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at com.mobilitymedia.connected.http.core.LoggerFilter.doFilter(LoggerFilter.java:41) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330) at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:118) at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:84) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:113) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:103) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:113) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:154) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:45) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.authentication.preauth.AbstractPreAuthenticatedProcessingFilter.doFilter(AbstractPreAuthenticatedProcessingFilter.java:94) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:50) at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:87) at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:342) at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:192) at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:160) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:344) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:261) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103) at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436) at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078) at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625) at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:316) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate. at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:1514) at org.springframework.amqp.rabbit.core.RabbitTemplate.receive(RabbitTemplate.java:802) at com.myService.IncidentServiceDefaultImpl.createIncident(IncidentServiceDefaultImpl.java:36) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:188) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:104) ... 65 more