1
votes

I am trying to implement Spring integration with MQTT. I am using Mosquitto as MQTT broker. with the reference to docs provided in the following link. I have created a project and added all the required jar files. when i execute MQTTJavaApplication.

public class MqttJavaApplication {

     public static void main(String[] args) {
            new SpringApplicationBuilder(MqttJavaApplication.class)
                    .web(false)
                    .run(args);
        }

        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }

        @Bean
        public MqttPahoMessageDrivenChannelAdapter inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "test",
                                                     "sample");
            adapter.setCompletionTimeout(5000);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttInputChannel());
            return adapter;
        }

        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {

                @Override
                public void handleMessage(Message<?> message) {
                    System.out.println("Test##########"+message.getPayload());
                }

            };
        }
}

I am getting following error when i publish a message via MQTT broker.

[2015-11-23 10:08:19.545] boot - 8100  INFO [main] --- AnnotationConfigApplicationContext: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@2d0e1c: startup date [Mon Nov 23 10:08:19 IST 2015]; root of context hierarchy
[2015-11-23 10:08:19.831] boot - 8100  INFO [main] --- PropertiesFactoryBean: Loading properties file from URL [jar:file:/D:/IoTWorkspace/MQTTTest/WebContent/WEB-INF/lib/spring-integration-core-4.2.1.RELEASE.jar!/META-INF/spring.integration.default.properties]
[2015-11-23 10:08:20.001] boot - 8100  INFO [main] --- DefaultLifecycleProcessor: Starting beans in phase 1073741823
[2015-11-23 10:08:20.104] boot - 8100  INFO [main] --- MqttPahoMessageDrivenChannelAdapter: started inbound
[2015-11-23 10:08:20.112] boot - 8100  INFO [main] --- MqttJavaApplication: Started MqttJavaApplication in 1.602 seconds (JVM running for 2.155)
[2015-11-23 10:13:04.564] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Unhandled exception for GenericMessage [payload=Testing Subscription, headers={timestamp=1448253784563, id=cd5be974-3b19-8317-47eb-1c139725be24, mqtt_qos=0, mqtt_topic=sample, mqtt_retained=false, mqtt_duplicate=false}]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.messageArrived(MqttPahoMessageDrivenChannelAdapter.java:262)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:336)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:148)
    at java.lang.Thread.run(Thread.java:722)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 10 more
[2015-11-23 10:13:04.613] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Lost connection:MqttException; retrying...
[2015-11-23 10:13:04.614] boot - 8100 ERROR [MQTT Call: test] --- MqttPahoMessageDrivenChannelAdapter: Failed to schedule reconnect
java.lang.NullPointerException
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.scheduleReconnect(MqttPahoMessageDrivenChannelAdapter.java:228)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectionLost(MqttPahoMessageDrivenChannelAdapter.java:255)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost(CommsCallback.java:229)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:339)
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:171)
    at java.lang.Thread.run(Thread.java:722)
[2015-11-23 10:13:04.617] boot - 8100  INFO [Thread-0] --- AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2d0e1c: startup date [Mon Nov 23 10:08:19 IST 2015]; root of context hierarchy
[2015-11-23 10:13:04.619] boot - 8100  INFO [Thread-0] --- DefaultLifecycleProcessor: Stopping beans in phase 1073741823
[2015-11-23 10:13:04.622] boot - 8100 ERROR [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: Exception while unsubscribing
Client is not connected (32104)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:132)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.unsubscribe(MqttAsyncClient.java:707)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.unsubscribe(MqttAsyncClient.java:682)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStop(MqttPahoMessageDrivenChannelAdapter.java:124)
    at org.springframework.integration.endpoint.AbstractEndpoint.doStop(AbstractEndpoint.java:145)
    at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:128)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
    at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
    at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:118)
    at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:966)
    at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:893)
[2015-11-23 10:13:04.623] boot - 8100 ERROR [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: Exception while disconnecting
Client is disconnected (32101)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:27)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.disconnect(ClientComms.java:405)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:524)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:493)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:500)
    at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.doStop(MqttPahoMessageDrivenChannelAdapter.java:131)
    at org.springframework.integration.endpoint.AbstractEndpoint.doStop(AbstractEndpoint.java:145)
    at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:128)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
    at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
    at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
    at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:118)
    at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:966)
    at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:893)
[2015-11-23 10:13:04.624] boot - 8100  INFO [Thread-0] --- MqttPahoMessageDrivenChannelAdapter: stopped inbound

Please find below stack trace after adding @SpringBootApplication Annotation

[2015-11-26 10:55:46.571] boot - 5524  INFO [main] --- AnnotationConfigApplicationContext: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@562d4b: startup date [Thu Nov 26 10:55:46 IST 2015]; root of context hierarchy

[2015-11-26 10:55:46.572] boot - 5524 WARN [main] --- AnnotationConfigApplicationContext: Exception thrown from ApplicationListener handling ContextClosedEvent java.lang.IllegalStateException: ApplicationEventMulticaster not initialized - call 'refresh' before multicasting events via the context: org.springframework.context.annotation.AnnotationConfigApplicationContext@562d4b: startup date [Thu Nov 26 10:55:46 IST 2015]; root of context hierarchy at org.springframework.context.support.AbstractApplicationContext.getApplicationEventMulticaster(AbstractApplicationContext.java:337) at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:324) at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1025) at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:988) at org.springframework.boot.SpringApplication.run(SpringApplication.java:329) at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:130) at com.igate.MqttJavaApplication.main(MqttJavaApplication.java:32) [2015-11-26 10:55:46.594] boot - 5524 WARN [main] --- AnnotationConfigApplicationContext: Exception thrown from LifecycleProcessor on context close java.lang.IllegalStateException: LifecycleProcessor not initialized - call 'refresh' before invoking lifecycle methods via the context: org.springframework.context.annotation.AnnotationConfigApplicationContext@562d4b: startup date [Thu Nov 26 10:55:46 IST 2015]; root of context hierarchy at org.springframework.context.support.AbstractApplicationContext.getLifecycleProcessor(AbstractApplicationContext.java:350) at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1033) at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:988) at org.springframework.boot.SpringApplication.run(SpringApplication.java:329) at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:130) at com.igate.MqttJavaApplication.main(MqttJavaApplication.java:32) Exception in thread "main" java.lang.IllegalStateException: At least one base package must be specified at org.springframework.context.annotation.ComponentScanAnnotationParser.parse(ComponentScanAnnotationParser.java:121) at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:214) at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:149) at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:135) at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:260) at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:203) at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:617) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:446) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:648) at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:130) at com.igate.MqttJavaApplication.main(MqttJavaApplication.java:32)

2
Sorry, do you really have there @SpringBootApplication?Artem Bilan
I am new to springs integration. please provide a sample project for integrating springs with mqtt if available.Avinash

2 Answers

0
votes

Yep! That's true. Without @SpringBootApplication I have the same StackTrace.

The Spring Boot stuff includes IntegrationAutoConfiguration who is responsible for subscribers and TaskScheduler bean population.

0
votes

Probably too late to answer but hope it might help some beginner.

Few Things:-

  • Declare @SpringBootApplication in your main class to let the project know its a spring boot application
  • For the 2nd Error, Define base package of the project where spring boot will refer for configuration files etc. To do so, add @ComponentScan({"package-name"}) at class level.

Sample Example:-

@SpringBootApplication
@ComponentScan({"package-name"})
public class Main {
...
}