0
votes

I'm surprised this is proving to be difficult to find.

I need to detect when a USB block device with a specific partition label is added (plugged in) using python3.

Is there a way to use pyudev to provide a list of USB block devices? How can I specify a filter with subsystem="block" AND subsystem="usb", they seem to be mutually exclusive filters.

When a USB device having a partition named "XYZ" is plugged in, I need to run a script to mount it and run a program that uses the data on that partition.

1

1 Answers

1
votes

I have tried too many variations to count, from various udev rules, systemd units, many scripts and combinations thereof, but have not had any success until I used the following code. It worked but caused 100% CPU load. When I added sleep time in the while loop at the end it no longer worked at all, and even prevented PCmanFM automounting as well.

The issue was in the usbEvent.py process. I could run it from the command line and it worked just fine. First thing it does is use Popen to call "grep devName /proc/mounts" to wait for the automounter to mount the partition. The Popen is called in a loop and adding some time.sleep eliminated the CPU burden tho that was surprising since the mount point appears in under a few seconds.

There seems to be some interplay between the code below systemd runs and the usbEvent.py process it spawns that I don't fully understand. They are separate processes so I would think they should be quite independent of each other.

The usbEvent.py handler works but it takes much longer to recognize the mount and continue. While it's running it consumes around 5% of the CPU, and only 0.3 when it finishes. Why it doesn't end when the timeout is over must be due to p.communicate, but if p.poll does NOT return None the process should be complete and should not block... but it does! Why?

The platform is a Raspberry Pi4 with 8GB RAM and January 2021 Raspbery Pi OS release.

#!/usr/bin/env python3
import os
import time
import subprocess as sp

import pyudev

# This code is run on boot via systemd to detect when
# my custom USB storage device (USB stick, SSD etc) 
# is inserted or removed. It spawns a new process to 
# handle the event.

context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by('block', device_type="partition")


def log_event(action, device):
    devName = device.get('DEVNAME')
    devLabel = device.get('ID_FS_LABEL')
    if devLabel == "MY_CUSTOM_USB":
        sp.Popen(["/home/user/bin/customUSB/usbEvent.py",
                  action, devName, devLabel],
                 stdin=sp.DEVNULL, stdout=sp.DEVNULL, stderr=sp.DEVNULL)


observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()

while True:
#    pass
    time.sleep(0.1)

Here is the portion of the usbEvent.py handler that I changed to get it working:

# Waits for mount point of "dev" to appear and returns it. Communnicate
def getMountPoint(dev):
    out = ""
    interval = 0.1
    timeout = 5 / interval
    while timeout > 0:
        p = sp.Popen(["grep", dev, "/proc/mounts"],
                     text=True, stdout=sp.PIPE, stderr=sp.PIPE)
        retCode = p.poll()
        if retCode is None:
            time.sleep(interval)
        else:
            out, err = p.communicate()  # This should not block but does!
            if retCode == 0 and len(out) > 0:
                out = out.split()[1]
                break
            else:
                lg.info(f"exit code: {retCode} Error: {err}")
                exit(1)
    if timeout == 0:
        p.terminate()
    return out