Update formatting in usbguard-notify.py

This commit is contained in:
Alexander Rosenberg 2025-05-05 09:12:53 +09:00
parent ad3a00266a
commit 4f0093975b
Signed by: Zander671
GPG Key ID: 5FD0394ADBD72730

View File

@ -5,19 +5,19 @@ import re
import threading import threading
from subprocess import Popen, run, DEVNULL, PIPE from subprocess import Popen, run, DEVNULL, PIPE
USBGUARD_EXEC_NAME = shutil.which('usbguard') USBGUARD_EXEC_NAME = shutil.which("usbguard")
DUNSTIFY_EXEC_NAME = shutil.which('dunstify') DUNSTIFY_EXEC_NAME = shutil.which("dunstify")
open_notifications = {} open_notifications = {}
def parse_event_type_and_id(stream): def parse_event_type_and_id(stream):
line = stream.readline() line = stream.readline()
if not line.startswith('[device] '): if not line.startswith("[device] "):
return None return None
event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line) event_type = re.findall("(?<=\\[device\\] )[a-zA-Z]+", line)
if len(event_type) == 0: if len(event_type) == 0:
return None return None
event_id = re.findall('(?<=id=)[0-9]+', line) event_id = re.findall("(?<=id=)[0-9]+", line)
if len(event_id) == 0: if len(event_id) == 0:
return None return None
return event_type[0], int(event_id[0]) return event_type[0], int(event_id[0])
@ -28,10 +28,10 @@ def parse_event_properties(stream, count):
for _ in range(count): for _ in range(count):
line = stream.readline() line = stream.readline()
try: try:
sep_ind = line.index('=') sep_ind = line.index("=")
prop_name = line[1:sep_ind] prop_name = line[1:sep_ind]
props[prop_name] = line[sep_ind + 1:-1] props[prop_name] = line[sep_ind + 1 : -1]
if prop_name == 'device_rule': if prop_name == "device_rule":
break break
except ValueError: except ValueError:
continue continue
@ -41,25 +41,35 @@ def parse_event_properties(stream, count):
def get_name_and_id_from_rule(rule): def get_name_and_id_from_rule(rule):
name = re.findall('(?<=name ")[^"]+(?=")', rule) name = re.findall('(?<=name ")[^"]+(?=")', rule)
if len(name) == 0: if len(name) == 0:
name = '' name = ""
else: else:
name = name[0] name = name[0]
id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule) id = re.findall("(?<=id )[a-z0-9]{4}:[a-z0-9]{4}", rule)
if len(id) == 0: if len(id) == 0:
id = '' id = ""
else: else:
id = id[0] id = id[0]
return name, id return name, id
def prompt_device_action(dev_id, name, long_id): def prompt_device_action(dev_id, name, long_id):
proc = Popen([DUNSTIFY_EXEC_NAME, '-p', proc = Popen(
'-A', 'block,Block', [
'-A', 'allow,Allow', DUNSTIFY_EXEC_NAME,
'-A', 'reject,Reject', "-p",
f'{name} ({long_id})', "-A",
'New Device'], "block,Block",
stdout=PIPE, text=True, bufsize=0) "-A",
"allow,Allow",
"-A",
"reject,Reject",
f"{name} ({long_id})",
"New Device",
],
stdout=PIPE,
text=True,
bufsize=0,
)
open_notifications[dev_id] = int(proc.stdout.readline()) open_notifications[dev_id] = int(proc.stdout.readline())
option = proc.communicate()[0][:-1] option = proc.communicate()[0][:-1]
try: try:
@ -67,24 +77,29 @@ def prompt_device_action(dev_id, name, long_id):
except KeyError: except KeyError:
pass pass
match option: match option:
case 'reject': case "reject":
run([USBGUARD_EXEC_NAME, 'reject-device', long_id]) run([USBGUARD_EXEC_NAME, "reject-device", long_id])
case 'allow': case "allow":
run([USBGUARD_EXEC_NAME, 'allow-device', long_id]) run([USBGUARD_EXEC_NAME, "allow-device", long_id])
case _: case _:
run([USBGUARD_EXEC_NAME, 'block-device', long_id]) run([USBGUARD_EXEC_NAME, "block-device", long_id])
def close_notification(dev_id): def close_notification(dev_id):
if dev_id in open_notifications: if dev_id in open_notifications:
notif_id = open_notifications.pop(dev_id) notif_id = open_notifications.pop(dev_id)
run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)]) run([DUNSTIFY_EXEC_NAME, "-C", str(notif_id)])
with Popen([USBGUARD_EXEC_NAME, 'watch'], with Popen(
stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc: [USBGUARD_EXEC_NAME, "watch"],
stdin=DEVNULL,
stdout=PIPE,
text=True,
bufsize=0,
) as usbguard_proc:
new_devices = set() new_devices = set()
usbguard_proc.stdout.readline() # get rid of initial connection message usbguard_proc.stdout.readline() # get rid of initial connection message
while True: while True:
@ -92,19 +107,21 @@ with Popen([USBGUARD_EXEC_NAME, 'watch'],
if event_type_result is None: if event_type_result is None:
continue continue
event_type, dev_id = event_type_result event_type, dev_id = event_type_result
if event_type not in ['PresenceChanged', 'PolicyApplied']: if event_type not in ["PresenceChanged", "PolicyApplied"]:
continue continue
props = parse_event_properties(usbguard_proc.stdout, 3) props = parse_event_properties(usbguard_proc.stdout, 3)
name, long_id = get_name_and_id_from_rule(props['device_rule']) name, long_id = get_name_and_id_from_rule(props["device_rule"])
match event_type: match event_type:
case 'PresenceChanged': case "PresenceChanged":
if props['event'] == 'Insert': if props["event"] == "Insert":
new_devices.add(dev_id) new_devices.add(dev_id)
else: else:
close_notification(dev_id) close_notification(dev_id)
new_devices.discard(dev_id) new_devices.discard(dev_id)
case 'PolicyApplied': case "PolicyApplied":
if props['target_new'] == 'block': if props["target_new"] == "block":
threading.Thread(target=prompt_device_action, threading.Thread(
args=(dev_id, name, long_id)).start() target=prompt_device_action,
args=(dev_id, name, long_id),
).start()
new_devices.discard(dev_id) new_devices.discard(dev_id)