diff --git a/usbguard-notify.py b/usbguard-notify.py index afdfead..750ed8a 100755 --- a/usbguard-notify.py +++ b/usbguard-notify.py @@ -5,19 +5,19 @@ import re import threading from subprocess import Popen, run, DEVNULL, PIPE -USBGUARD_EXEC_NAME = shutil.which('usbguard') -DUNSTIFY_EXEC_NAME = shutil.which('dunstify') +USBGUARD_EXEC_NAME = shutil.which("usbguard") +DUNSTIFY_EXEC_NAME = shutil.which("dunstify") open_notifications = {} def parse_event_type_and_id(stream): line = stream.readline() - if not line.startswith('[device] '): + if not line.startswith("[device] "): return None - event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line) + event_type = re.findall("(?<=\\[device\\] )[a-zA-Z]+", line) if len(event_type) == 0: return None - event_id = re.findall('(?<=id=)[0-9]+', line) + event_id = re.findall("(?<=id=)[0-9]+", line) if len(event_id) == 0: return None return event_type[0], int(event_id[0]) @@ -28,10 +28,10 @@ def parse_event_properties(stream, count): for _ in range(count): line = stream.readline() try: - sep_ind = line.index('=') + sep_ind = line.index("=") prop_name = line[1:sep_ind] - props[prop_name] = line[sep_ind + 1:-1] - if prop_name == 'device_rule': + props[prop_name] = line[sep_ind + 1 : -1] + if prop_name == "device_rule": break except ValueError: continue @@ -41,25 +41,35 @@ def parse_event_properties(stream, count): def get_name_and_id_from_rule(rule): name = re.findall('(?<=name ")[^"]+(?=")', rule) if len(name) == 0: - name = '' + name = "" else: name = name[0] - id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule) + id = re.findall("(?<=id )[a-z0-9]{4}:[a-z0-9]{4}", rule) if len(id) == 0: - id = '' + id = "" else: id = id[0] return name, id def prompt_device_action(dev_id, name, long_id): - proc = Popen([DUNSTIFY_EXEC_NAME, '-p', - '-A', 'block,Block', - '-A', 'allow,Allow', - '-A', 'reject,Reject', - f'{name} ({long_id})', - 'New Device'], - stdout=PIPE, text=True, bufsize=0) + proc = Popen( + [ + DUNSTIFY_EXEC_NAME, + "-p", + "-A", + "block,Block", + "-A", + "allow,Allow", + "-A", + "reject,Reject", + f"{name} ({long_id})", + "New Device", + ], + stdout=PIPE, + text=True, + bufsize=0, + ) open_notifications[dev_id] = int(proc.stdout.readline()) option = proc.communicate()[0][:-1] try: @@ -67,24 +77,29 @@ def prompt_device_action(dev_id, name, long_id): except KeyError: pass match option: - case 'reject': - run([USBGUARD_EXEC_NAME, 'reject-device', long_id]) + case "reject": + run([USBGUARD_EXEC_NAME, "reject-device", long_id]) - case 'allow': - run([USBGUARD_EXEC_NAME, 'allow-device', long_id]) + case "allow": + run([USBGUARD_EXEC_NAME, "allow-device", long_id]) case _: - run([USBGUARD_EXEC_NAME, 'block-device', long_id]) + run([USBGUARD_EXEC_NAME, "block-device", long_id]) def close_notification(dev_id): if dev_id in open_notifications: notif_id = open_notifications.pop(dev_id) - run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)]) + run([DUNSTIFY_EXEC_NAME, "-C", str(notif_id)]) -with Popen([USBGUARD_EXEC_NAME, 'watch'], - stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc: +with Popen( + [USBGUARD_EXEC_NAME, "watch"], + stdin=DEVNULL, + stdout=PIPE, + text=True, + bufsize=0, +) as usbguard_proc: new_devices = set() usbguard_proc.stdout.readline() # get rid of initial connection message while True: @@ -92,19 +107,21 @@ with Popen([USBGUARD_EXEC_NAME, 'watch'], if event_type_result is None: continue event_type, dev_id = event_type_result - if event_type not in ['PresenceChanged', 'PolicyApplied']: + if event_type not in ["PresenceChanged", "PolicyApplied"]: continue props = parse_event_properties(usbguard_proc.stdout, 3) - name, long_id = get_name_and_id_from_rule(props['device_rule']) + name, long_id = get_name_and_id_from_rule(props["device_rule"]) match event_type: - case 'PresenceChanged': - if props['event'] == 'Insert': + case "PresenceChanged": + if props["event"] == "Insert": new_devices.add(dev_id) else: close_notification(dev_id) new_devices.discard(dev_id) - case 'PolicyApplied': - if props['target_new'] == 'block': - threading.Thread(target=prompt_device_action, - args=(dev_id, name, long_id)).start() + case "PolicyApplied": + if props["target_new"] == "block": + threading.Thread( + target=prompt_device_action, + args=(dev_id, name, long_id), + ).start() new_devices.discard(dev_id)