2
votes

I have a basic MQTTListener class in Python which listens for messages on certain topics and should start or stop an async process imported from another script. This process runs forever, unless it is manually stopped. Let's assume the Listener looks like this:

import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function

class MqttListener:

    def __init__(self, host, port, client_id):
        self.host = host
        self.port = port
        self.client_id = client_id
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.connect(host=self.host, port=self.port)

    def on_connect(self, client, userdata, flags, rc):
        self.client.subscribe(topic=[("start", 1), ])
        self.client.subscribe(topic=[("stop", 1), ])
        logging.info(msg="MQTT - connected!")

    def on_disconnect(client, userdata, rc):
        logging.info(msg="MQTT - disconnected!")

    def on_message(self, client, userdata, message, ):
        print('PROCESSING MESSAGE', message.topic, message.payload.decode('utf-8'), )

        if message.topic == 'start':
            async_forever_function(param='start')
            print('process started')
        else:
            async_forever_function(param='stop')
            print('process removed')

    def start(self):
        self.client.on_connect = lambda client, userdata, flags, rc: self.on_connect(client, userdata, flags, rc)
        self.client.on_message = lambda client, userdata, message: self.on_message(client, userdata, message)
        self.client.on_disconnect = lambda client, userdata, rc: self.on_disconnect(client, userdata, rc)

        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

Now, this works for starting a new async process. That is, async_function is correctly triggered when a message is posted on the start MQTT topic. However, once this async process is started, the listener is no longer able to receive/process messages from the stop MQTT topic and the async process will continue to run forever, when in fact it should have been stopped.

My question: how can I adapt the code of this class such that it can also process messages when an active async process is running in the background?

1
Is process started printed when the start message arrived?hardillb
Edit the question to show the output so we can see what's loggedhardillb
@hardillb I'm sorry, I misread your comment. No, process started is not printed, because the async process is still running.Riley

1 Answers

0
votes

You can not do blocking tasks in the on_message() callback.

This call back runs on the MQTT client thread (the one started by the loop_start() function. This thread handles all network traffic and message handling, if you block it then it can't do any of that.

If you want to call long running tasks from the on_message() callback you need to start a new thread for the long running task so it doesn't block the MQTT client loop.