0
votes

Task is to automate pairing and connection process between Arduino and Raspberry Pi over Bluetooth using D-BUS API based on python script.

The Bluetooth module connected to Arduino is: Grove - Serial Bluetooth v3.0.

I am able to automate pairing process. The pairing script does the following in order:

  1. It looks for an Bluetooth module named Slave by creating adapter object and using StartDiscovery method.(naming is done in the Arduino).
  2. Registers Bluetooth agent.
  3. Creates device object and pairs via Pair method if the device is not already paired.

The part of the code that does above steps given below:

register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
    new_dbus_device = get_device(address_list[i])
    dev_path = new_dbus_device.object_path
    device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
    pair_status = device_properties.Get("org.bluez.Device1", "Paired")
    if not pair_status:
        new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)

Here is what register_agent() does as requested:

def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)

However when I try to call Connect method as documented in device-api of Bluez, it always fails. I have created a custom serial port profile and tried ConnectProfile method but it failed again.

The communication over Bluetooth works if I use deprecated rfcomm tool, or it works if I use python socket module. However I want to avoid using rfcomm due to being deprecated. Regarding using python socket library, the connection works only in rfcomm channel 1, others channels produce Connection Refused error.

Adding MRE, to clarify the question better:

import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess

from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop

DBusGMainLoop(set_as_default=True) 

path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus() 

device_obj = None
dev_path = None

def set_trusted(path2):
    props = dbus.Interface(bus.get_object("org.bluez", path2),
                    "org.freedesktop.DBus.Properties")
    props.Set("org.bluez.Device1", "Trusted", True)

class Rejected(dbus.DBusException):
    _dbus_error_name = "org.bluez.Error.Rejected"
    
class Agent(dbus.service.Object):
    exit_on_release = True

    def set_exit_on_release(self, exit_on_release):
        self.exit_on_release = exit_on_release

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Release(self):
        print("Release")
        if self.exit_on_release:
            mainloop.quit()

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="os", out_signature="")
    def AuthorizeService(self, device, uuid):
        print("AuthorizeService (%s, %s)" % (device, uuid))
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="s")
    def RequestPinCode(self, device):
        set_trusted(device)
        return "0000" 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="u")
    def RequestPasskey(self, device): 
        set_trusted(device)
        return dbus.UInt32("0000") 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="ou", out_signature="")
    def RequestConfirmation(self, device, passkey):
        set_trusted(device)
        return 
        
    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="")
    def RequestAuthorization(self, device):
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Cancel(self):
        print("Cancel")

def pair_reply():
    print("Device paired and trusted")
    set_trusted(dev_path) 
    
def pair_error(error):
    err_name = error.get_dbus_name()
    if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
        print("Timed out. Cancelling pairing")
        device_obj.CancelPairing()
    else:
        print("Creating device failed: %s" % (error))
    mainloop.quit() 
    
def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)
    
def start_discovery():
    global pi_adapter
    pi_adapter = find_adapter() 
    scan_filter = dict({"DuplicateData": False}) 
    pi_adapter.SetDiscoveryFilter(scan_filter)
    pi_adapter.StartDiscovery()
    
def stop_discovery():
    pi_adapter.StopDiscovery()
    
def get_device(dev_str):
    # use [Service] and [Object path]:
    device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
    # use [Interface]:
    device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
    return device1

def char_changer(text):
    text = text.replace(':', r'_')
    return text

def slave_finder(device_name):
    
    global sublist_normal
    sublist_normal = []
    sublist= []
    address = []
    edited_address = None
    sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
    print(sub.stdout) #string type
    sublist = sub.stdout.split()
    for i in range(len(sublist)):
        if sublist[i] == device_name:
            print(sublist[i-1])
            sublist_normal.append(sublist[i-1])
            edited_address = char_changer(sublist[i-1])
            address.append(edited_address)
    return address
    
def remove_all_paired():
    for i in range(len(sublist_normal)):
        sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
        time.sleep(1)
    
    
if __name__ == '__main__':
    
    
    pair_status = None
    address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
    time.sleep(2)
    remove_all_paired() #rfcomm requires repairing after release
    print(sublist_normal)
    if address_list: 
        register_agent()
        start_discovery() 
        time.sleep(10) 
        for i in range(len(address_list)):
            new_dbus_device = get_device(address_list[i])
            dev_path = new_dbus_device.object_path
            device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
            pair_status = device_properties.Get("org.bluez.Device1", "Paired")
            if not pair_status:
                new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
    
    
    mainloop = GLib.MainLoop()
    mainloop.run()

sudo btmon output:

Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l)                             0.832473
= Note: Bluetooth subsystem version 2.22                               0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0)              [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13                                 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation)  [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14             {0x0001} 0.832490
@ MGMT Open: btmon (privileged) version 1.14                  {0x0002} 0.832540

So the question is why Connect and ConnectProfile methods are always failing, what do I need to do establish bluetooth communication based on D-BUS API between Arduino and Raspberry Pi?

1
You have not detailed what register_agent() is doing. I am assuming your are registering your own agent; something along the lines of simple-agent?ukBaz
Edited the question. Yes exactly, it does what you said.Mr. Panda
Thanks for updating the question. And what is in the Agent code? What is in pair_reply and pair_error? What outputs are you getting in the terminal and from sudo btmon? Can I draw your attention to How to create a Minimal, Reproducible ExampleukBaz
I have added them as well. getdevice() is a method from bluezutils module. So when I call new_dbus_device.Connect() it generates an error.Mr. Panda

1 Answers

0
votes

I think you had answered your own question. The reason that a Connect doesn't work is because you have no Serial Port Profile (SPP) running on the Raspberry Pi. If you have btmon running you will see that the client disconnects because there is no matching profile with what the Arduino is offering.

The port number used in the Python Socket connection needs to match the port number on the Arduino Bluetooth module. This is probably why only 1 is working.

For reference I tested this with a Raspberry Pi and HC-06 I had laying around. I removed all the scanning code to try to get to the minimum runnable code. Here is the code I used:

import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()


class Agent(dbus.service.Object):

    @dbus.service.method(AGENT_IFACE,
                         in_signature='o', out_signature='s')
    def RequestPinCode(self, device):
        print(f'RequestPinCode {device}')
        return '0000'


class Device:
    def __init__(self, device_path):
        dev_obj = bus.get_object(BUS_NAME, device_path)
        self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
        self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
        self._port = 1
        self._client_sock = socket.socket(socket.AF_BLUETOOTH,
                                          socket.SOCK_STREAM,
                                          socket.BTPROTO_RFCOMM)

    def connect(self):
        # self.methods.Connect()
        self._client_sock.bind((my_adapter_address, self._port))
        self._client_sock.connect((self.address, self._port))

    def disconnect(self):
        self.methods.Disconnect()

    def pair(self):
        self.methods.Pair(reply_handler=self._pair_reply,
                          error_handler=self._pair_error)

    def _pair_reply(self):
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        self.trusted = True
        print(f'Device trusted={self.trusted}, connected={self.connected}')
        while self.connected:
            sleep(0.5)
        self.connect()
        print('Successfully paired and connected')

    def _pair_error(self, error):
        err_name = error.get_dbus_name()
        print(f'Creating device failed: {err_name}')

    @property
    def trusted(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))

    @trusted.setter
    def trusted(self, value):
        self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))

    @property
    def paired(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Paired'))

    @property
    def connected(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Connected'))

    @property
    def address(self):
        return str(self.props.Get(DEVICE_IFACE, 'Address'))


if __name__ == '__main__':
    agent = Agent(bus, AGENT_PATH)

    agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
                               AGNT_MNGR_IFACE)
    agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)

    device = Device(my_device_path)
    if device.paired:
        device.connect()
    else:
        device.pair()


    mainloop = GLib.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        agnt_mngr.UnregisterAgent(AGENT_PATH)
        mainloop.quit()