We're experiencing an issue with one of our MQTT subscribers in Spring integration (4.0.3.RELEASE running on Tomcat 7 with the Paho MQTT Client 0.4.0).
The issue is with a subscriber on a heavily used topic (lots of messages). The devices sending the messages to the topic are devices in the field connecting over GPRS.
Spring integration and the broker (Mosquitto) are running on the same server.
The issue seems to appear after doing a couple of redeploys on the Tomcat without restarting the server. When the issue occurs, a restart of the tomcat instance fixes it for a while.
Here's the chain of events (from the mosquitto logs. The vdm-dev-live
subscriber is the one with the issues):
When starting spring integration, we see all subscribers connecting to the various topics:
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-live (c1, k60).
1409645645: Sending CONNACK to vdm-dev-live (0)
1409645645: Received SUBSCRIBE from vdm-dev-live
1409645645: vdm/+/+/+/liveData (QoS 1)
1409645645: Sending SUBACK to vdm-dev-live
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmReq
1409645645: vdm/+/+/+/firmware/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-cfgReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-cfgReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-cfgReq
1409645645: vdm/+/+/+/config/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-cfgReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmStat (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmStat (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmStat
1409645645: vdm/+/+/firmware/status (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmStat
We see messages going back and forth
1409645646: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
We see the ping requests from the various subscribers
1409645705: Received PINGREQ from vdm-dev-update
1409645705: Sending PINGRESP to vdm-dev-update
1409645705: Received PINGREQ from vdm-dev-live
1409645705: Sending PINGRESP to vdm-dev-live
1409645705: Received PINGREQ from vdm-dev-fmReq
1409645705: Sending PINGRESP to vdm-dev-fmReq
1409645705: Received PINGREQ from vdm-dev-cfgReq
1409645705: Sending PINGRESP to vdm-dev-cfgReq
1409645705: Received PINGREQ from vdm-dev-fmStat
1409645705: Sending PINGRESP to vdm-dev-fmStat
But all of a sudden we see this:
1409645776: Socket error on client vdm-dev-live, disconnecting.
And at that point the subscriber is dead. We're not seeing any ping requests and it is no longer processing any messages from that topic. On a broker level everything is still fine, as I have debug-log subscribers (using NodeJS) where I see that those subscribers are still processing the messages from that topic just fine (so the issue is on the subscriber level).
In the tomcat logs we also see this:
Sep 02, 2014 10:16:05 AM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: vdm-dev-live: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,409,645,705,714 lastInboundActivity=1,409,645,755,075
But Paho doesn't do any cleanup / restart of this subscriber.
I'm also seeing this in the Tomcat logs:
SEVERE: The web application [/vdmapp] appears to have started a thread named [MQTT Snd: vdm-dev-live] but has failed to stop it. This is very likely to create a memory leak.
I also noticed a lot of threads for that subscriber that are stuck while doing a shutdown.
"MQTT Snd: vdm-dev-live" daemon prio=10 tid=0x00007f1b44781800 nid=0x3061 in Object.wait() [0x00007f1aa7bfa000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1258)
- locked <0x00000007ab13e218> (a java.lang.Thread)
at java.lang.Thread.join(Thread.java:1332)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.stop(CommsReceiver.java:77)
- locked <0x00000007ab552730> (a java.lang.Object)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:294)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:154)
at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:131)
at java.lang.Thread.run(Thread.java:722)
Any idea what is causing this and how this can be prevented ?