Merge branch 'main' of git.zander.im:Zander671/random-scripts
This commit is contained in:
		| @ -5,19 +5,19 @@ import re | ||||
| import threading | ||||
| from subprocess import Popen, run, DEVNULL, PIPE | ||||
|  | ||||
| USBGUARD_EXEC_NAME = shutil.which('usbguard') | ||||
| DUNSTIFY_EXEC_NAME = shutil.which('dunstify') | ||||
| USBGUARD_EXEC_NAME = shutil.which("usbguard") | ||||
| DUNSTIFY_EXEC_NAME = shutil.which("dunstify") | ||||
| open_notifications = {} | ||||
|  | ||||
|  | ||||
| def parse_event_type_and_id(stream): | ||||
|     line = stream.readline() | ||||
|     if not line.startswith('[device] '): | ||||
|     if not line.startswith("[device] "): | ||||
|         return None | ||||
|     event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line) | ||||
|     event_type = re.findall("(?<=\\[device\\] )[a-zA-Z]+", line) | ||||
|     if len(event_type) == 0: | ||||
|         return None | ||||
|     event_id = re.findall('(?<=id=)[0-9]+', line) | ||||
|     event_id = re.findall("(?<=id=)[0-9]+", line) | ||||
|     if len(event_id) == 0: | ||||
|         return None | ||||
|     return event_type[0], int(event_id[0]) | ||||
| @ -28,10 +28,10 @@ def parse_event_properties(stream, count): | ||||
|     for _ in range(count): | ||||
|         line = stream.readline() | ||||
|         try: | ||||
|             sep_ind = line.index('=') | ||||
|             sep_ind = line.index("=") | ||||
|             prop_name = line[1:sep_ind] | ||||
|             props[prop_name] = line[sep_ind + 1 : -1] | ||||
|             if prop_name == 'device_rule': | ||||
|             if prop_name == "device_rule": | ||||
|                 break | ||||
|         except ValueError: | ||||
|             continue | ||||
| @ -41,25 +41,35 @@ def parse_event_properties(stream, count): | ||||
| def get_name_and_id_from_rule(rule): | ||||
|     name = re.findall('(?<=name ")[^"]+(?=")', rule) | ||||
|     if len(name) == 0: | ||||
|         name = '' | ||||
|         name = "" | ||||
|     else: | ||||
|         name = name[0] | ||||
|     id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule) | ||||
|     id = re.findall("(?<=id )[a-z0-9]{4}:[a-z0-9]{4}", rule) | ||||
|     if len(id) == 0: | ||||
|         id = '' | ||||
|         id = "" | ||||
|     else: | ||||
|         id = id[0] | ||||
|     return name, id | ||||
|  | ||||
|  | ||||
| def prompt_device_action(dev_id, name, long_id): | ||||
|     proc = Popen([DUNSTIFY_EXEC_NAME, '-p', | ||||
|                   '-A', 'block,Block', | ||||
|                   '-A', 'allow,Allow', | ||||
|                   '-A', 'reject,Reject', | ||||
|                   f'{name} ({long_id})', | ||||
|                   'New Device'], | ||||
|                  stdout=PIPE, text=True, bufsize=0) | ||||
|     proc = Popen( | ||||
|         [ | ||||
|             DUNSTIFY_EXEC_NAME, | ||||
|             "-p", | ||||
|             "-A", | ||||
|             "block,Block", | ||||
|             "-A", | ||||
|             "allow,Allow", | ||||
|             "-A", | ||||
|             "reject,Reject", | ||||
|             f"{name} ({long_id})", | ||||
|             "New Device", | ||||
|         ], | ||||
|         stdout=PIPE, | ||||
|         text=True, | ||||
|         bufsize=0, | ||||
|     ) | ||||
|     open_notifications[dev_id] = int(proc.stdout.readline()) | ||||
|     option = proc.communicate()[0][:-1] | ||||
|     try: | ||||
| @ -67,24 +77,29 @@ def prompt_device_action(dev_id, name, long_id): | ||||
|     except KeyError: | ||||
|         pass | ||||
|     match option: | ||||
|         case 'reject': | ||||
|             run([USBGUARD_EXEC_NAME, 'reject-device', long_id]) | ||||
|         case "reject": | ||||
|             run([USBGUARD_EXEC_NAME, "reject-device", long_id]) | ||||
|  | ||||
|         case 'allow': | ||||
|             run([USBGUARD_EXEC_NAME, 'allow-device', long_id]) | ||||
|         case "allow": | ||||
|             run([USBGUARD_EXEC_NAME, "allow-device", long_id]) | ||||
|  | ||||
|         case _: | ||||
|             run([USBGUARD_EXEC_NAME, 'block-device', long_id]) | ||||
|             run([USBGUARD_EXEC_NAME, "block-device", long_id]) | ||||
|  | ||||
|  | ||||
| def close_notification(dev_id): | ||||
|     if dev_id in open_notifications: | ||||
|         notif_id = open_notifications.pop(dev_id) | ||||
|         run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)]) | ||||
|         run([DUNSTIFY_EXEC_NAME, "-C", str(notif_id)]) | ||||
|  | ||||
|  | ||||
| with Popen([USBGUARD_EXEC_NAME, 'watch'], | ||||
|            stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc: | ||||
| with Popen( | ||||
|     [USBGUARD_EXEC_NAME, "watch"], | ||||
|     stdin=DEVNULL, | ||||
|     stdout=PIPE, | ||||
|     text=True, | ||||
|     bufsize=0, | ||||
| ) as usbguard_proc: | ||||
|     new_devices = set() | ||||
|     usbguard_proc.stdout.readline()  # get rid of initial connection message | ||||
|     while True: | ||||
| @ -92,19 +107,21 @@ with Popen([USBGUARD_EXEC_NAME, 'watch'], | ||||
|         if event_type_result is None: | ||||
|             continue | ||||
|         event_type, dev_id = event_type_result | ||||
|         if event_type not in ['PresenceChanged', 'PolicyApplied']: | ||||
|         if event_type not in ["PresenceChanged", "PolicyApplied"]: | ||||
|             continue | ||||
|         props = parse_event_properties(usbguard_proc.stdout, 3) | ||||
|         name, long_id = get_name_and_id_from_rule(props['device_rule']) | ||||
|         name, long_id = get_name_and_id_from_rule(props["device_rule"]) | ||||
|         match event_type: | ||||
|             case 'PresenceChanged': | ||||
|                 if props['event'] == 'Insert': | ||||
|             case "PresenceChanged": | ||||
|                 if props["event"] == "Insert": | ||||
|                     new_devices.add(dev_id) | ||||
|                 else: | ||||
|                     close_notification(dev_id) | ||||
|                     new_devices.discard(dev_id) | ||||
|             case 'PolicyApplied': | ||||
|                 if props['target_new'] == 'block': | ||||
|                     threading.Thread(target=prompt_device_action, | ||||
|                                      args=(dev_id, name, long_id)).start() | ||||
|             case "PolicyApplied": | ||||
|                 if props["target_new"] == "block": | ||||
|                     threading.Thread( | ||||
|                         target=prompt_device_action, | ||||
|                         args=(dev_id, name, long_id), | ||||
|                     ).start() | ||||
|                     new_devices.discard(dev_id) | ||||
|  | ||||
		Reference in New Issue
	
	Block a user