2
votes

I'm configuring Spring Integration to use cleanSession=false on one of my channels.

<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="cleanSession" value="false" />
</bean>


<int-mqtt:message-driven-channel-adapter id="mqttLiveDataInbound"
    client-id="client1"
    url="${mqtt.broker.url}"
    qos="1"
    topics="liveData"
    client-factory="clientFactory"
    channel="channelLiveData"/> 

Reason being I want to be able to receive message while my application is offline. When my application restarts, I want it to receive whatever QoS>0 message that were published during my absence.

Now I noticed something weird : my application doesn't pick up the missing QoS>0 messages after a downtime.

I've logged a simple scenario where

  • Spring Integration starts
  • A QoS1 message is sent to the topic and received by Spring Integration
  • Spring Integration exits
  • A QoS1 message is sent to the topic
  • Spring Integration starts
  • Spring Integration does not receive the QoS1 message that was sent while it was offline.

The reason being the following (as can be seen from the logs below) :

  • spring integration exits
  • it unsubscribes from the topic
  • it disconnects the client

This is essentially telling the broker that this client is no longer interested in these messages. When my app is down, the broker is no longer persisting these QoS>0 messages for me.

When my app starts up again, it fails to receive the QoS>0 messages that were published while it was down.

1448917620: New connection from 127.0.0.1 on port 1883.
1448917620: New client connected from 127.0.0.1 as client1 (c0, k60).
1448917620: Sending CONNACK to client1 (0, 0)
1448917620: Received SUBSCRIBE from client1
1448917620:     liveData (QoS 1)
1448917620: Sending SUBACK to client1
1448917632: New connection from ::1 on port 1883.
1448917632: New client connected from ::1 as mosqpub/25936-MacBook-P (c1, k60, u'system').
1448917632: Sending CONNACK to mosqpub/25936-MacBook-P (0, 0)
1448917632: Received PUBLISH from mosqpub/25936-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917632: Sending PUBACK to mosqpub/25936-MacBook-P (Mid: 1)
1448917632: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917632: Received DISCONNECT from mosqpub/25936-MacBook-P
1448917632: Client mosqpub/25936-MacBook-P disconnected.
1448917633: Received PUBACK from client1 (Mid: 1)
1448917643: Received UNSUBSCRIBE from client1
1448917643:     liveData
1448917643: Received DISCONNECT from client1
1448917643: Client client1 disconnected.
1448917648: New connection from ::1 on port 1883.
1448917648: New client connected from ::1 as mosqpub/25945-MacBook-P (c1, k60, u'system').
1448917648: Sending CONNACK to mosqpub/25945-MacBook-P (0, 0)
1448917648: Received PUBLISH from mosqpub/25945-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917648: Sending PUBACK to mosqpub/25945-MacBook-P (Mid: 1)
1448917648: Received DISCONNECT from mosqpub/25945-MacBook-P
1448917648: Client mosqpub/25945-MacBook-P disconnected.
1448917665: New connection from 127.0.0.1 on port 1883.
1448917665: Client client1 disconnected.
1448917665: New client connected from 127.0.0.1 as client1 (c0, k60).
1448917665: Sending CONNACK to client1 (1, 0)
1448917665: Received SUBSCRIBE from client1
1448917665:     liveData (QoS 1)
1448917665: Sending SUBACK to client1

I ran this scenario using the mosquitto client tools, and there exiting the mosquitto subscriber disconnects the clients but does not unsubscribe from the topic

1448917534: New connection from ::1 on port 1883.
1448917534: New client connected from ::1 as client1 (c0, k60).
1448917534: Sending CONNACK to client1 (0, 0)
1448917534: Received SUBSCRIBE from client1
1448917534:     liveData (QoS 1)
1448917534: Sending SUBACK to client1
1448917550: New connection from ::1 on port 1883.
1448917550: New client connected from ::1 as mosqpub/25879-MacBook-P (c1, k60, u'system').
1448917550: Sending CONNACK to mosqpub/25879-MacBook-P (0, 0)
1448917550: Received PUBLISH from mosqpub/25879-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917550: Sending PUBACK to mosqpub/25879-MacBook-P (Mid: 1)
1448917550: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917550: Received DISCONNECT from mosqpub/25879-MacBook-P
1448917550: Client mosqpub/25879-MacBook-P disconnected.
1448917550: Received PUBACK from client1 (Mid: 1)
1448917553: Socket error on client client1, disconnecting.
1448917554: New connection from ::1 on port 1883.
1448917554: New client connected from ::1 as mosqpub/25884-MacBook-P (c1, k60, u'system').
1448917554: Sending CONNACK to mosqpub/25884-MacBook-P (0, 0)
1448917554: Received PUBLISH from mosqpub/25884-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917554: Sending PUBACK to mosqpub/25884-MacBook-P (Mid: 1)
1448917554: Received DISCONNECT from mosqpub/25884-MacBook-P
1448917555: Client mosqpub/25884-MacBook-P disconnected.
1448917556: New connection from ::1 on port 1883.
1448917556: Client client1 disconnected.
1448917556: New client connected from ::1 as client1 (c0, k60).
1448917556: Sending CONNACK to client1 (0, 0)
1448917556: Sending PUBLISH to client1 (d0, q1, r0, m2, 'liveData', ... (68 bytes))
1448917556: Received SUBSCRIBE from client1
1448917556:     liveData (QoS 1)
1448917556: Sending SUBACK to client1
1448917556: Received PUBACK from client1 (Mid: 2)

Any idea how to deal with this situation ?

EDIT :

When implementing the workaround as proposed in the accepted answer, I'm getting the following error. My Spring context is loaded from a webapp. I've tried putting the IgnoreUnsubscribePahoClientFactory in a seperate JAR (same level as spring-integration / paho) as well as in the webapp classes itself.

2015-12-02 15:47:43,703 ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class [class org.eclipse.paho.client.mqttv3.MqttAsyncClient]: Common causes of this problem include using a final class or a non-visible class; nested exception is org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
        at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:206)
        at org.springframework.aop.framework.ProxyFactoryBean.getProxy(ProxyFactoryBean.java:368)
        at org.springframework.aop.framework.ProxyFactoryBean.getSingletonInstance(ProxyFactoryBean.java:322)
        at org.springframework.aop.framework.ProxyFactoryBean.getObject(ProxyFactoryBean.java:246)
        at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.proxy(IgnoreUnsubscribePahoClientFactory.java:62)
        at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.getAsyncClientInstance(IgnoreUnsubscribePahoClientFactory.java:43)
        at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:216)
        at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.access$300(MqttPahoMessageDrivenChannelAdapter.java:45)
        at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter$1.run(MqttPahoMessageDrivenChannelAdapter.java:272)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
        at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:237)
        at org.springframework.cglib.proxy.Enhancer.createHelper(Enhancer.java:377)
        at org.springframework.cglib.proxy.Enhancer.createClass(Enhancer.java:317)
        at org.springframework.aop.framework.ObjenesisCglibAopProxy.createProxyClassAndInstance(ObjenesisCglibAopProxy.java:57)
        at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:202)
        ... 16 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.springframework.cglib.core.ReflectUtils.defineClass(ReflectUtils.java:384)
        at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:219)
        ... 20 more
Caused by: java.lang.SecurityException: class "org.eclipse.paho.client.mqttv3.MqttAsyncClient$$EnhancerBySpringCGLIB$$d14754a9_4603"'s signer information does not match signer information of other classes in the same package
        at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
        at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
        ... 25 more
1

1 Answers

1
votes

It's a bug - it unconditionally unsubscribes during the stop().

I don't see a simple work around; but I have a couple of ideas; I'll post here if/when I have something.

In the meantime, please open a JIRA Issue.

EDIT

Gist Here

It's a bit of a sledgehammer, but it should work for you; it effectively ignores the call to unsubscribe on the client. It could be made a little more sophisticated, to only ignore when the QOS is > 0, but that would be quite a bit more involved.

If you're already using the DefaultMqttPahoClientFactory just change the bean class to this one. If you're not currently using a factory, declare it as a bean and provide it to the adapter using the client-factory attribute.

We'll fix it properly in an upcoming release.