Fix set-initial-kanshi-state and usbguard-notify.py
This commit is contained in:
@ -21,7 +21,7 @@ fi
|
|||||||
|
|
||||||
get_current_profile() {
|
get_current_profile() {
|
||||||
local profile
|
local profile
|
||||||
profile="$(kanshictl status | jq -er '.current_profile')"
|
profile="${$(kanshictl status):17}"
|
||||||
(( $? )) && return 1
|
(( $? )) && return 1
|
||||||
: ${(P)1::="${profile}"}
|
: ${(P)1::="${profile}"}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import shutil
|
|||||||
import re
|
import re
|
||||||
import threading
|
import threading
|
||||||
from subprocess import Popen, run, DEVNULL, PIPE
|
from subprocess import Popen, run, DEVNULL, PIPE
|
||||||
|
import sys
|
||||||
|
|
||||||
USBGUARD_EXEC_NAME = shutil.which("usbguard")
|
USBGUARD_EXEC_NAME = shutil.which("usbguard")
|
||||||
DUNSTIFY_EXEC_NAME = shutil.which("dunstify")
|
DUNSTIFY_EXEC_NAME = shutil.which("dunstify")
|
||||||
@ -12,14 +13,16 @@ open_notifications = {}
|
|||||||
|
|
||||||
def parse_event_type_and_id(stream):
|
def parse_event_type_and_id(stream):
|
||||||
line = stream.readline()
|
line = stream.readline()
|
||||||
|
if not line:
|
||||||
|
return None, True
|
||||||
if not line.startswith("[device] "):
|
if not line.startswith("[device] "):
|
||||||
return None
|
return None, False
|
||||||
event_type = re.findall("(?<=\\[device\\] )[a-zA-Z]+", line)
|
event_type = re.findall("(?<=\\[device\\] )[a-zA-Z]+", line)
|
||||||
if len(event_type) == 0:
|
if len(event_type) == 0:
|
||||||
return None
|
return None, False
|
||||||
event_id = re.findall("(?<=id=)[0-9]+", line)
|
event_id = re.findall("(?<=id=)[0-9]+", line)
|
||||||
if len(event_id) == 0:
|
if len(event_id) == 0:
|
||||||
return None
|
return None, False
|
||||||
return event_type[0], int(event_id[0])
|
return event_type[0], int(event_id[0])
|
||||||
|
|
||||||
|
|
||||||
@ -101,9 +104,15 @@ with Popen(
|
|||||||
bufsize=0,
|
bufsize=0,
|
||||||
) as usbguard_proc:
|
) as usbguard_proc:
|
||||||
new_devices = set()
|
new_devices = set()
|
||||||
usbguard_proc.stdout.readline() # get rid of initial connection message
|
first_line = (
|
||||||
|
usbguard_proc.stdout.readline()
|
||||||
|
) # get rid of initial connection message
|
||||||
|
if not first_line:
|
||||||
|
sys.exit()
|
||||||
while True:
|
while True:
|
||||||
event_type_result = parse_event_type_and_id(usbguard_proc.stdout)
|
event_type_result, eof = parse_event_type_and_id(usbguard_proc.stdout)
|
||||||
|
if eof:
|
||||||
|
sys.exit()
|
||||||
if event_type_result is None:
|
if event_type_result is None:
|
||||||
continue
|
continue
|
||||||
event_type, dev_id = event_type_result
|
event_type, dev_id = event_type_result
|
||||||
|
|||||||
Reference in New Issue
Block a user