From 8043218190c85c0f9ad38a0751547715a6d4459e Mon Sep 17 00:00:00 2001 From: Alexander Rosenberg Date: Wed, 17 May 2023 17:21:35 -0700 Subject: [PATCH] Add usbguard-notify.py --- usbguard-notify.py | 111 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100755 usbguard-notify.py diff --git a/usbguard-notify.py b/usbguard-notify.py new file mode 100755 index 0000000..5a7575b --- /dev/null +++ b/usbguard-notify.py @@ -0,0 +1,111 @@ +#!/usr/bin/python3 + +import shutil +import re +import threading +from subprocess import Popen, run, DEVNULL, PIPE + +USBGUARD_EXEC_NAME = shutil.which('usbguard') +DUNSTIFY_EXEC_NAME = shutil.which('dunstify') +open_notifications = {} + + +def parse_event_type_and_id(stream): + line = stream.readline() + if not line.startswith('[device] '): + return None + event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line) + if len(event_type) == 0: + return None + event_id = re.findall('(?<=id=)[0-9]+', line) + if len(event_id) == 0: + return None + return event_type[0], int(event_id[0]) + + +def parse_event_properties(stream, count): + props = {} + for _ in range(count): + line = stream.readline() + try: + sep_ind = line.index('=') + prop_name = line[1:sep_ind] + props[prop_name] = line[sep_ind + 1:-1] + if prop_name == 'device_rule': + break + except ValueError: + continue + return props + + +def get_name_and_id_from_rule(rule): + name = re.findall('(?<=name ")[^"]+(?=")', rule) + if len(name) == 0: + name = '' + else: + name = name[0] + id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule) + if len(id) == 0: + id = '' + else: + id = id[0] + return name, id + + +def prompt_device_action(dev_id, name, long_id): + proc = Popen([DUNSTIFY_EXEC_NAME, '-p', + '-A', '2,Block', + '-A', '1,Allow', + '-A', '0,Reject', + f'{name} ({long_id})', + 'New Device'], + stdout=PIPE, text=True, bufsize=0) + open_notifications[dev_id] = int(proc.stdout.readline()) + print(str(open_notifications[dev_id])) + option = int(proc.communicate()[0]) + try: + open_notifications.pop(dev_id) + except KeyError: + pass + match option: + case 0: + run([USBGUARD_EXEC_NAME, 'reject-device', long_id]) + + case 1: + run([USBGUARD_EXEC_NAME, 'allow-device', long_id]) + + case 2: + run([USBGUARD_EXEC_NAME, 'block-device', long_id]) + + +def close_notification(dev_id): + if dev_id in open_notifications: + notif_id = open_notifications.pop(dev_id) + run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)]) + + +with Popen([USBGUARD_EXEC_NAME, 'watch'], + stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc: + new_devices = set() + usbguard_proc.stdout.readline() # get rid of initial connection message + while True: + event_type_result = parse_event_type_and_id(usbguard_proc.stdout) + if event_type_result is None: + continue + event_type, dev_id = event_type_result + if event_type not in ['PresenceChanged', 'PolicyApplied']: + continue + props = parse_event_properties(usbguard_proc.stdout, 3) + name, long_id = get_name_and_id_from_rule(props['device_rule']) + match event_type: + case 'PresenceChanged': + if props['event'] == 'Insert': + new_devices.add(dev_id) + else: + close_notification(dev_id) + new_devices.discard(dev_id) + case 'PolicyApplied': + if props['target_new'] == 'block': + threading.Thread(target=prompt_device_action, + args=(dev_id, name, long_id)).start() + new_devices.discard(dev_id)