I'm trying to make a Flex-based desktop application consume messages from an ActiveMQ topic with a durable subscription, using the JMS bridge of BlazeDS. The basic scenario is as follows:
Messages are produced by other producers in the topic to which the Flex client is subscribed.
The Flex client may go offline from time to time, but it must receive all the messages it has missed while being offline when it connects to BlazeDS again. (Of course the Flex client connects with the same client ID every time).
It can not be guaranteed that the Flex client is shut down gracefully.
Everything works fine if I explicitly disconnect my consumer on the Flex side by calling disconnect()
- I do it in the exit handler of the application. However, due to #3 above, it is not guaranteed that disconnect()
is called all the time. When the Flex client shuts down without calling disconnect()
, it seems that the subscription of the "proxy JMS client" that BlazeDS creates and associates to the Flex client stays active towards ActiveMQ, so ActiveMQ still thinks that the client is logged in. When the Flex app starts up the next time, it is unable to log in to BlazeDS because ActiveMQ refuses its subscription, claiming that the client ID is already taken. Why is it so and what can I do here to ensure that BlazeDS makes the "proxy JMS client" offline in ActiveMQ when its real Flex counterpart terminates unexpectedly?
More detailed information: some debugging revealed that:
BlazeDS becomes aware of the termination of the Flex client because it prints a few exceptions to the console when in debug mode. The messages are as follows:
[BlazeDS]23:18:13.688 [WARN] Endpoint with id 'my-streaming-amf' is closing the streaming connection to FlexClient with id '71E6466F-D91F-201C-F60A-A6CB52F95D9F' because endpoint encountered a socket write error, possibly due to an unresponsive FlexClient. ClientAbortException: java.net.SocketException: Broken pipe at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:319) at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288) at org.apache.catalina.connector.Response.flushBuffer(Response.java:542) at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:279) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.handleFlexClientStreamingOpenRequest(BaseStreamingHTTPEndpoint.java:818) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.serviceStreamingRequest(BaseStreamingHTTPEndpoint.java:1055) at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.service(BaseStreamingHTTPEndpoint.java:460) at flex.messaging.MessageBrokerServlet.service(MessageBrokerServlet.java:353) at javax.servlet.http.HttpServlet.service(HttpServlet.java:803) at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290) at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:263) at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844) at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:584) at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447) at java.lang.Thread.run(Thread.java:680) Caused by: java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:737) at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434) at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299) at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:963) at org.apache.coyote.Response.action(Response.java:183) at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314) ... 20 more [BlazeDS]23:18:13.689 [DEBUG] Streaming thread 'http-8400-1' for endpoint with id 'my-streaming-amf' is releasing connection and returning to the request handler pool. [BlazeDS]23:18:13.689 [INFO] Number of streaming clients for FlexSession with id '5BC5E8D604A361BCA673B05AC624CCC1' is 0. [BlazeDS]23:18:13.689 [DEBUG] Number of streaming clients for endpoint with id 'my-streaming-amf' is 0.
At this stage, the subscriptions are still shown on the ActiveMQ web admin interface as being active.
Killing BlazeDS (more precisely, the Tomcat server that hosts it) with
kill -9
from the console makes ActiveMQ realize immediately that the "proxy JMS client" is gone and it becomes offline on the ActiveMQ web admin interface. This made me conclude that BlazeDS is keeping the proxy JMS client alive explicitly sincekill -9
gives no chance to BlazeDS to unsubscribe the client but it still becomes offline in ActiveMQ.
So, the question once again: What can I do here to ensure that BlazeDS makes the "proxy JMS client" offline in ActiveMQ when its real Flex counterpart terminates unexpectedly? Is this a bug in BlazeDS or am I just missing some hidden configuration setting that would make it work?
Version information: BlazeDS 4.0, ActiveMQ 5.5.0, both freshly downloaded today. I'm using the Tomcat server in the BlazeDS turnkey but ActiveMQ is installed separately because the BlazeDS turnkey ships with ActiveMQ 4.1.1 only. By the way, that version of ActiveMQ has the same issue.