Add usbguard-notify.py
This commit is contained in:
parent
da3292d2c7
commit
8043218190
111
usbguard-notify.py
Executable file
111
usbguard-notify.py
Executable file
@ -0,0 +1,111 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import shutil
|
||||
import re
|
||||
import threading
|
||||
from subprocess import Popen, run, DEVNULL, PIPE
|
||||
|
||||
USBGUARD_EXEC_NAME = shutil.which('usbguard')
|
||||
DUNSTIFY_EXEC_NAME = shutil.which('dunstify')
|
||||
open_notifications = {}
|
||||
|
||||
|
||||
def parse_event_type_and_id(stream):
|
||||
line = stream.readline()
|
||||
if not line.startswith('[device] '):
|
||||
return None
|
||||
event_type = re.findall('(?<=\\[device\\] )[a-zA-Z]+', line)
|
||||
if len(event_type) == 0:
|
||||
return None
|
||||
event_id = re.findall('(?<=id=)[0-9]+', line)
|
||||
if len(event_id) == 0:
|
||||
return None
|
||||
return event_type[0], int(event_id[0])
|
||||
|
||||
|
||||
def parse_event_properties(stream, count):
|
||||
props = {}
|
||||
for _ in range(count):
|
||||
line = stream.readline()
|
||||
try:
|
||||
sep_ind = line.index('=')
|
||||
prop_name = line[1:sep_ind]
|
||||
props[prop_name] = line[sep_ind + 1:-1]
|
||||
if prop_name == 'device_rule':
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
return props
|
||||
|
||||
|
||||
def get_name_and_id_from_rule(rule):
|
||||
name = re.findall('(?<=name ")[^"]+(?=")', rule)
|
||||
if len(name) == 0:
|
||||
name = ''
|
||||
else:
|
||||
name = name[0]
|
||||
id = re.findall('(?<=id )[a-z0-9]{4}:[a-z0-9]{4}', rule)
|
||||
if len(id) == 0:
|
||||
id = ''
|
||||
else:
|
||||
id = id[0]
|
||||
return name, id
|
||||
|
||||
|
||||
def prompt_device_action(dev_id, name, long_id):
|
||||
proc = Popen([DUNSTIFY_EXEC_NAME, '-p',
|
||||
'-A', '2,Block',
|
||||
'-A', '1,Allow',
|
||||
'-A', '0,Reject',
|
||||
f'{name} ({long_id})',
|
||||
'New Device'],
|
||||
stdout=PIPE, text=True, bufsize=0)
|
||||
open_notifications[dev_id] = int(proc.stdout.readline())
|
||||
print(str(open_notifications[dev_id]))
|
||||
option = int(proc.communicate()[0])
|
||||
try:
|
||||
open_notifications.pop(dev_id)
|
||||
except KeyError:
|
||||
pass
|
||||
match option:
|
||||
case 0:
|
||||
run([USBGUARD_EXEC_NAME, 'reject-device', long_id])
|
||||
|
||||
case 1:
|
||||
run([USBGUARD_EXEC_NAME, 'allow-device', long_id])
|
||||
|
||||
case 2:
|
||||
run([USBGUARD_EXEC_NAME, 'block-device', long_id])
|
||||
|
||||
|
||||
def close_notification(dev_id):
|
||||
if dev_id in open_notifications:
|
||||
notif_id = open_notifications.pop(dev_id)
|
||||
run([DUNSTIFY_EXEC_NAME, '-C', str(notif_id)])
|
||||
|
||||
|
||||
with Popen([USBGUARD_EXEC_NAME, 'watch'],
|
||||
stdin=DEVNULL, stdout=PIPE, text=True, bufsize=0) as usbguard_proc:
|
||||
new_devices = set()
|
||||
usbguard_proc.stdout.readline() # get rid of initial connection message
|
||||
while True:
|
||||
event_type_result = parse_event_type_and_id(usbguard_proc.stdout)
|
||||
if event_type_result is None:
|
||||
continue
|
||||
event_type, dev_id = event_type_result
|
||||
if event_type not in ['PresenceChanged', 'PolicyApplied']:
|
||||
continue
|
||||
props = parse_event_properties(usbguard_proc.stdout, 3)
|
||||
name, long_id = get_name_and_id_from_rule(props['device_rule'])
|
||||
match event_type:
|
||||
case 'PresenceChanged':
|
||||
if props['event'] == 'Insert':
|
||||
new_devices.add(dev_id)
|
||||
else:
|
||||
close_notification(dev_id)
|
||||
new_devices.discard(dev_id)
|
||||
case 'PolicyApplied':
|
||||
if props['target_new'] == 'block':
|
||||
threading.Thread(target=prompt_device_action,
|
||||
args=(dev_id, name, long_id)).start()
|
||||
new_devices.discard(dev_id)
|
Loading…
Reference in New Issue
Block a user