3
votes

I'm in the process of learning how to use the Java Spring Framework and started experimenting with Spring Integration. I'm trying to use Spring Integration to connect my application to an MQTT broker both to publish and subscribe to messages but I'm having trouble finding a way to manually publish messages to an outbound channel. If possible I want to build it using notations in the java code exclusively rather than xml files defining beans and other related configuration.

In every example I've seen the solution to manually publishing a message seems to be to use a MessagingGateway Interface and then use the SpringApplicationBuilder to get the ConfigurableApplicationContext to get a reference to the gateway interface in the main method. The reference is then used to publish a message. Would it be possible to use AutoWired for the interface instead? In my attempts I just get a NullPointer.

My aim is to build a game where I subscribe to a topic to get game messages and then whenever the user is ready to make the next move, publish a new message to the topic.

Update: This is one of the examples I've been looking at of how to setup an outbound channel: https://docs.spring.io/spring-integration/reference/html/mqtt.html

Update 2 after answer from Gary Russel:

This is some example code I wrote after looking at examples which gets me a NullPointer when using @AutoWired for the Gateway when running gateway.sendToMqtt in Controller.java. What I want to achieve here is to send an mqtt message manually when a GET request is handled by the controller.

Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}

Controller.java

@RestController
@RequestMapping("/publishMessage")
public class Controller {

    @Autowired
    static Gateway gateway;

    @RequestMapping(method = RequestMethod.GET)
    public int request(){
        gateway.sendToMqtt("Test Message!");
        return 0;
    }
}

MqttPublisher.java

@EnableIntegration
@Configuration
public class MqttPublisher {
    @Bean
    public MqttPahoClientFactory mqttClientFactory(){
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound(){
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("clientPublisher", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("topic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel(){
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface Gateway {

        void sendToMqtt(String data);
    }
}

Update:

Not sure if this is the proper logging but it is what I get from adding:

logging.level.org.springframework.web=Debug
logging.level.org.hibernate=Error

to application.properties.

https://hastebin.com/cuvonufeco.hs

1
If you have a problem with your code, you need to post your code as a minimal reproducible example with a good description of the problem and where exactly in your code it occurs.Erwin Bolwidt
@ErwinBolwidt: This is not really a problem with my code but rather a question of how and if this can be done.Jimmy
How does it work if you remove static from the Gateway property?Artem Bilan
@ArtemBilan: If I remove static my IDE complains that "No beans of 'Gateway' type found"Jimmy
Well, that’s just IDE issue. You don’t have it at runtime, especially in productionArtem Bilan

1 Answers

4
votes

Use a Messaging Gateway or simply send a message to the channel.

EDIT

@SpringBootApplication
public class So47846492Application {

    public static void main(String[] args) {
        SpringApplication.run(So47846492Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MyGate gate) {
        return args -> {
            gate.send("someTopic", "foo");
            Thread.sleep(5_000);
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "toMqtt")
    public MqttPahoMessageHandler mqtt() {
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler("tcp://localhost:1883", "foo",
                clientFactory());
        handler.setDefaultTopic("myTopic");
        handler.setQosExpressionString("1");
        return handler;
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setUserName("guest");
        factory.setPassword("guest");
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter mqttIn() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "bar", "someTopic");
        adapter.setOutputChannelName("fromMqtt");
        return adapter;
    }

    @ServiceActivator(inputChannel = "fromMqtt")
    public void in(String in) {
        System.out.println(in);
    }

    @MessagingGateway(defaultRequestChannel = "toMqtt")
    public interface MyGate {

        void send(@Header(MqttHeaders.TOPIC) String topic, String out);

    }

}