#!/usr/bin/python3 import shutil import re import threading from subprocess import Popen, run, DEVNULL, PIPE USBGUARD_EXEC_NAME = shutil.which('usbguard') DUNSTIFY_EXEC_NAME = shutil.which('dunstify') open_notifications = {} def parse_event_type_and_id(stream): line = stream.readline() if not line.startswith('[device] '): return None event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line) if len(event_type) == 0: return None event_id = re.findall('(?<=id=)[0-9]+', line) if len(event_id) == 0: return None return event_type[0], int(event_id[0]) def parse_event_properties(stream, count): props = {} for _ in range(count): line = stream.readline() try: sep_ind = line.index('=') prop_name = line[1:sep_ind] props[prop_name] = line[sep_ind + 1:-1] if prop_name == 'device_rule': break except ValueError: continue return props def get_name_and_id_from_rule(rule): name = re.findall('(?<=name ")[^"]+(?=")', rule) if len(name) == 0: name = '' else: name = name[0] id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule) if len(id) == 0: id = '' else: id = id[0] return name, id def prompt_device_action(dev_id, name, long_id): proc = Popen([DUNSTIFY_EXEC_NAME, '-p', '-A', 'block,Block', '-A', 'allow,Allow', '-A', 'reject,Reject', f'{name} ({long_id})', 'New Device'], stdout=PIPE, text=True, bufsize=0) open_notifications[dev_id] = int(proc.stdout.readline()) option = proc.communicate()[0][:-1] try: open_notifications.pop(dev_id) except KeyError: pass match option: case 'reject': run([USBGUARD_EXEC_NAME, 'reject-device', long_id]) case 'allow': run([USBGUARD_EXEC_NAME, 'allow-device', long_id]) case _: run([USBGUARD_EXEC_NAME, 'block-device', long_id]) def close_notification(dev_id): if dev_id in open_notifications: notif_id = open_notifications.pop(dev_id) run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)]) with Popen([USBGUARD_EXEC_NAME, 'watch'], stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc: new_devices = set() usbguard_proc.stdout.readline() # get rid of initial connection message while True: event_type_result = parse_event_type_and_id(usbguard_proc.stdout) if event_type_result is None: continue event_type, dev_id = event_type_result if event_type not in ['PresenceChanged', 'PolicyApplied']: continue props = parse_event_properties(usbguard_proc.stdout, 3) name, long_id = get_name_and_id_from_rule(props['device_rule']) match event_type: case 'PresenceChanged': if props['event'] == 'Insert': new_devices.add(dev_id) else: close_notification(dev_id) new_devices.discard(dev_id) case 'PolicyApplied': if props['target_new'] == 'block': threading.Thread(target=prompt_device_action, args=(dev_id, name, long_id)).start() new_devices.discard(dev_id)