1
votes

A bit of a overview of what I am doing is publishing sensor data from a Raspbery Pi to AWS which stores the data to DynamoDB and invokes a lambda function. This lambda function then publishes a message to a topic subscribed by the raspberry Pi.

So my issue is a callback is not being called, so I can not access the message published from AWS lambda. I verified this message is being published to a topic subscribed by the RaspberryPi on AWSIoT test. I am using AWSIoTPythonSDK library on the raspberry Pi and Boto3 on the AWS lambda function.

Also, I've read a possible solution by using AWS IoT shadow, but this solution is so close to being done - I do not want to abandon my effort when it seems to be one line of code that is not working. Send data from cloud to aws iot thing

Please let me know any ideas for how to troubleshoot this further.

So far I have tried printing the stack after the subscribe function and it outputs this from the stack: * I didn't let the whole loop finish*

pi@raspberrypi:~/eve-pi $ pi@raspberrypi:~/eve-pi $ python3 sensor_random.py
    for line in traceback.format_stack():
File "sensor_random.py", line 66, in <module>
    for line in traceback.format_stack():
^CTraceback (most recent call last):
  File "sensor_random.py", line 68, in <module>
    time.sleep(2)
KeyboardInterrupt

-bash: pi@raspberrypi:~/eve-pi: No such file or directory

Here is the Raspberry Pi code: *****Omitted publish code*********

import json
import time
import pytz
import traceback
import inspect
from time import sleep
from datetime import date, datetime
from random import randint
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

# AWS IoT certificate based connection
# MQQT client is an ID so that the MQTT broker can identify the client
myMQTTClient = AWSIoTMQTTClient("XXXXXXXX")
# this is the unique thing endpoint with the .503 certificate
myMQTTClient.configureEndpoint("XXXXXXXXX.us-west-2.amazonaws.com", 8883)

myMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myMQTTClient.configureMQTTOperationTimeout(20)  # 5 sec

def customCallback(client, userdata, message):
    traceback.print_stack()
    print('in callback 1')
    print(message.payload)
    print('in callback 2')

def rand_sensor_data():
    print('randomizing sensor data')
    for each in payload:
        each = randint(1, 51)

try:
    rand_sensor_data()
    print(payload)
    msg = json.dumps(payload)
    myMQTTClient.publish("thing01/data", msg, 0)
    print('before subscribe')
    for x in range(5):
        myMQTTClient.subscribe("thing02/water", 0, customCallback)
        for line in traceback.format_stack():
            print(line.strip())
        time.sleep(2)
    print('after subscribe')
except KeyboardInterrupt:
    GPIO.cleanup()
    print('exited')

Here is the AWS lambda code:

import json
import boto3

def lambda_handler(event, context):
   #testing for pi publishing
   message = {
       'topic': 'thing02/water',
       'payload': {'message': 'test'}
   }

   boto3.client(
       'iot-data',
       region_name='us-west-2',
       aws_access_key_id='<access-key>',
       aws_secret_access_key='<secret-access-key'
       ).publish(
           topic='thing02/water',
           payload=json.dumps(message),
           qos=1
           )
   print(json.dumps(message))
1

1 Answers

1
votes

First, the loop around the subscription makes no sense because x is never used and you should only have to subscribe to a topic once. The MQTT client does not poll a topic each time subscribe is called, it notifies the broker it wants all matching messages then just sits back and waits for the broker to send the matching messages it's way until you unsubscribe or disconnect.

You should move the subscribe to before the publish then it's set up and waiting for the response messages before the publish happens, which removes any chance of the client missing the message as it's still try to handle setting up the subscription.