0
votes

We're experiencing an issue with one of our MQTT subscribers in Spring integration (4.0.3.RELEASE running on Tomcat 7 with the Paho MQTT Client 0.4.0).

The issue is with a subscriber on a heavily used topic (lots of messages). The devices sending the messages to the topic are devices in the field connecting over GPRS.

Spring integration and the broker (Mosquitto) are running on the same server.

The issue seems to appear after doing a couple of redeploys on the Tomcat without restarting the server. When the issue occurs, a restart of the tomcat instance fixes it for a while.

Here's the chain of events (from the mosquitto logs. The vdm-dev-live subscriber is the one with the issues):

When starting spring integration, we see all subscribers connecting to the various topics:

1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-live (c1, k60).
1409645645: Sending CONNACK to vdm-dev-live (0)
1409645645: Received SUBSCRIBE from vdm-dev-live
1409645645:     vdm/+/+/+/liveData (QoS 1)
1409645645: Sending SUBACK to vdm-dev-live
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmReq
1409645645:     vdm/+/+/+/firmware/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-cfgReq (c1, k60).
1409645645: Sending CONNACK to vdm-dev-cfgReq (0)
1409645645: Received SUBSCRIBE from vdm-dev-cfgReq
1409645645:     vdm/+/+/+/config/request (QoS 1)
1409645645: Sending SUBACK to vdm-dev-cfgReq
1409645645: New connection from xxx.xx.xx.xxx on port 1873.
1409645645: New client connected from xxx.xx.xx.xxx as vdm-dev-fmStat (c1, k60).
1409645645: Sending CONNACK to vdm-dev-fmStat (0)
1409645645: Received SUBSCRIBE from vdm-dev-fmStat
1409645645:     vdm/+/+/firmware/status (QoS 1)
1409645645: Sending SUBACK to vdm-dev-fmStat

We see messages going back and forth

1409645646: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645646: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645648: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Received PUBLISH from 89320292400015932480 (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to vdm-dev-live (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to Yo3zC8ou5y (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))
1409645650: Sending PUBLISH to mqttjs_31f1e3f7cd0e0aed (d0, q0, r0, m0, 'vdm/89320292400015932480/WVWZZZ1KZDP005350/4.2/liveData', ... (36 bytes))

We see the ping requests from the various subscribers

1409645705: Received PINGREQ from vdm-dev-update
1409645705: Sending PINGRESP to vdm-dev-update
1409645705: Received PINGREQ from vdm-dev-live
1409645705: Sending PINGRESP to vdm-dev-live
1409645705: Received PINGREQ from vdm-dev-fmReq
1409645705: Sending PINGRESP to vdm-dev-fmReq
1409645705: Received PINGREQ from vdm-dev-cfgReq
1409645705: Sending PINGRESP to vdm-dev-cfgReq
1409645705: Received PINGREQ from vdm-dev-fmStat
1409645705: Sending PINGRESP to vdm-dev-fmStat

But all of a sudden we see this:

1409645776: Socket error on client vdm-dev-live, disconnecting.

And at that point the subscriber is dead. We're not seeing any ping requests and it is no longer processing any messages from that topic. On a broker level everything is still fine, as I have debug-log subscribers (using NodeJS) where I see that those subscribers are still processing the messages from that topic just fine (so the issue is on the subscriber level).

In the tomcat logs we also see this:

Sep 02, 2014 10:16:05 AM org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
SEVERE: vdm-dev-live: Timed out as no activity, keepAlive=60,000 lastOutboundActivity=1,409,645,705,714 lastInboundActivity=1,409,645,755,075

But Paho doesn't do any cleanup / restart of this subscriber.

I'm also seeing this in the Tomcat logs:

SEVERE: The web application [/vdmapp] appears to have started a thread named [MQTT Snd: vdm-dev-live] but has failed to stop it. This is very likely to create a memory leak.

I also noticed a lot of threads for that subscriber that are stuck while doing a shutdown.

"MQTT Snd: vdm-dev-live" daemon prio=10 tid=0x00007f1b44781800 nid=0x3061 in Object.wait() [0x00007f1aa7bfa000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1258)
    - locked <0x00000007ab13e218> (a java.lang.Thread)
    at java.lang.Thread.join(Thread.java:1332)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.stop(CommsReceiver.java:77)
    - locked <0x00000007ab552730> (a java.lang.Object)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:294)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.handleRunException(CommsSender.java:154)
    at org.eclipse.paho.client.mqttv3.internal.CommsSender.run(CommsSender.java:131)
    at java.lang.Thread.run(Thread.java:722)

Any idea what is causing this and how this can be prevented ?

2

2 Answers

2
votes

Following up from my comments in @Artem's answer...

There appears to be a deadlock in the Paho client. See line 573 of your Gist; the Snd thread is waiting for the Rec thread to terminate. At line 586, the Rec thread is blocked because the inbound queue is full (10). For all the cases that look like this, there is no Call thread. So the queue full condition will never be cleared. Notice at line 227 the trifecta of threads are working fine (presumably a reconnection after redeploy?).

With the dead threads, there is no Call thread.

I think the problem is in the Paho client - in the CommsCallback.run() method, there is a catch on Throwable, which closes the connection, but because the queue is full, the Rec thread is never notified (and so won't clean up). So it seems the message delivery is throwing an exception and, if the queue is full, causes this deadlock.

The Paho client needs a fix but in the meantime, we can figure out what the exception is.

If the exception is downstream of the inbound gateway, you should see a log...

        logger.error("Unhandled exception for " + message.toString(), e);

Since this log is produced in MqttCallback.messageArrived(), if you are not seeing such errors, the problem may be in the Paho client itself.

The exception handling in CommsCallback looks like this...

        } catch (Throwable ex) {
            // Users code could throw an Error or Exception e.g. in the case
            // of class NoClassDefFoundError
            // @TRACE 714=callback threw exception
            log.fine(className, methodName, "714", null, ex);
            running = false;
            clientComms.shutdownConnection(null, new MqttException(ex));
        }

(that is where they should call spaceAvailable.notifyAll() to wake the (dying) Rec thread).

So, turning on FINE logging for the Paho client should tell you where/what the exception is.

1
votes

First of all share, please, the versions of Spring Integration and Paho Client.

According to the after doing a couple of redeploys I see this code from CommsReceiver#.stop():

if (!Thread.currentThread().equals(recThread)) {
    try {
        // Wait for the thread to finish.
        recThread.join();
    }
    catch (InterruptedException ex) {
    }
}

Where Thread.join():

* Waits for this thread to die.

I'm really not sure what it means and how it should go further on that wait, but won't the redeploy be a bottleneck to allow for those daemons to continue to live, because the main Thread doesn't die?