refactor: make resizer use new loggers

This commit is contained in:
2 * r + 2 * t
2026-06-13 02:49:32 +10:00
parent c236823b76
commit 1c707d3a16
2 changed files with 44 additions and 54 deletions
+43 -47
View File
@@ -7,7 +7,7 @@ from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.io import log_message from caelestia.utils.io import error, fatal, info, log, warn
from caelestia.utils.paths import get_config from caelestia.utils.paths import get_config
@@ -68,7 +68,7 @@ class Command:
) )
return rules return rules
except KeyError: except KeyError:
log_message("ERROR: invalid config") warn("invalid config, falling back to default rules")
except FileNotFoundError: except FileNotFoundError:
pass pass
@@ -188,12 +188,10 @@ class Command:
command2 = self._make_move_cmd(int(move_x), int(move_y), address) command2 = self._make_move_cmd(int(move_x), int(move_y), address)
hypr.batch(command1, command2) hypr.batch(command1, command2)
log_message( info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})")
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
)
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") error(f"failed to apply PiP action to window 0x{window_id}: {e}")
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
dispatch_commands = [] dispatch_commands = []
@@ -214,10 +212,10 @@ class Command:
try: try:
hypr.batch(*dispatch_commands) hypr.batch(*dispatch_commands)
log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
return True return True
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") error(f"failed to apply window actions for window 0x{window_id}: {e}")
return False return False
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
@@ -236,7 +234,7 @@ class Command:
if re.search(rule.name, window_title): if re.search(rule.name, window_title):
return rule return rule
except re.error: except re.error:
log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") warn(f"invalid regex pattern in rule '{rule.name}'")
return None return None
@@ -258,7 +256,7 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
window_info = self._get_window_info(window_id) window_info = self._get_window_info(window_id)
@@ -268,19 +266,19 @@ class Command:
window_title = window_info.get("title", "") window_title = window_info.get("title", "")
initial_title = window_info.get("initialTitle", "") initial_title = window_info.get("initialTitle", "")
log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
rule = self._match_window_rule(window_title, initial_title) rule = self._match_window_rule(window_title, initial_title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") info(f"Matched rule '{rule.name}' for window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window title event: {e}") warn(f"failed to parse window title event: {e}")
def _handle_open_event(self, event: str) -> None: def _handle_open_event(self, event: str) -> None:
try: try:
@@ -296,22 +294,22 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
rule = self._match_window_rule(title, title) rule = self._match_window_rule(title, title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") info(f"Matched rule '{rule.name}' for new window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window open event: {e}") warn(f"failed to parse window open event: {e}")
def run(self) -> None: def run(self) -> None:
if self.args.daemon: if self.args.daemon:
@@ -324,7 +322,7 @@ class Command:
): ):
self._run_active_mode() self._run_active_mode()
else: else:
print( info(
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
) )
@@ -333,28 +331,27 @@ class Command:
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
if not active_window_result.get("floating", False): if not active_window_result.get("floating", False):
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") warn(f"window '{window_title}' is not floating; PiP only works on floating windows.")
print("Try making it floating first with: hyprctl dispatch togglefloating")
return return
print(f"Applying PIP to active window: '{window_title}'") info(f"Applying PiP to active window: '{window_title}'")
self._apply_pip_action(window_id) self._apply_pip_action(window_id)
print("PIP applied successfully") info("PiP applied successfully")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply PIP to active window: {e}") error(f"failed to apply PiP to active window: {e}")
def _run_active_mode(self) -> None: def _run_active_mode(self) -> None:
try: try:
@@ -371,10 +368,10 @@ class Command:
matching_windows = self._find_matching_windows(temp_rule) matching_windows = self._find_matching_windows(temp_rule)
if not matching_windows: if not matching_windows:
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
return return
print(f"Found {len(matching_windows)} matching window(s)") info(f"Found {len(matching_windows)} matching window(s)")
# Apply rule to all matching windows # Apply rule to all matching windows
success_count = 0 success_count = 0
@@ -382,41 +379,41 @@ class Command:
window_id = window["address"][2:] # Remove "0x" prefix window_id = window["address"][2:] # Remove "0x" prefix
window_title = window.get("title", "") window_title = window.get("title", "")
print(f"Applying rule to window 0x{window_id}: '{window_title}'") info(f"Applying rule to window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
success_count += 1 success_count += 1
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule: {e}") error(f"failed to apply rule: {e}")
def _apply_to_active_window(self, temp_rule: WindowRule) -> None: def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
"""Apply rule only to the currently active window""" """Apply rule only to the currently active window"""
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
print(f"Applying rule to active window 0x{window_id}: '{window_title}'") info(f"Applying rule to active window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
print("Rule applied successfully") info("Rule applied successfully")
else: else:
print("Failed to apply rule") error("failed to apply rule")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule to active window: {e}") error(f"failed to apply rule to active window: {e}")
def _find_matching_windows(self, temp_rule: WindowRule) -> list: def _find_matching_windows(self, temp_rule: WindowRule) -> list:
"""Find all windows that match the given rule pattern""" """Find all windows that match the given rule pattern"""
@@ -445,7 +442,7 @@ class Command:
try: try:
matches = bool(re.search(temp_rule.name, window_title)) matches = bool(re.search(temp_rule.name, window_title))
except re.error: except re.error:
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") warn(f"invalid regex pattern '{temp_rule.name}'")
return [] return []
if matches: if matches:
@@ -454,23 +451,22 @@ class Command:
return matching_windows return matching_windows
except Exception as e: except Exception as e:
print(f"ERROR: Failed to find matching windows: {e}") error(f"failed to find matching windows: {e}")
return [] return []
def _run_daemon(self) -> None: def _run_daemon(self) -> None:
log_message("Hyprland window resizer started") info("Hyprland window resizer started")
log_message(f"Loaded {len(self.window_rules)} window rules") info(f"Loaded {len(self.window_rules)} window rules")
socket_path = Path(hypr.socket2_path) socket_path = Path(hypr.socket2_path)
if not socket_path.exists(): if not socket_path.exists():
log_message(f"ERROR: Hyprland socket not found at {socket_path}") fatal(f"Hyprland socket not found at {socket_path}")
return
try: try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path) sock.connect(hypr.socket2_path)
log_message("Connected to Hyprland socket, listening for events...") info("Connected to Hyprland socket, listening for events...")
while True: while True:
data = sock.recv(4096).decode() data = sock.recv(4096).decode()
@@ -480,6 +476,6 @@ class Command:
self._handle_window_event(line) self._handle_window_event(line)
except KeyboardInterrupt: except KeyboardInterrupt:
log_message("Resizer daemon stopped") info("Resizer daemon stopped")
except Exception as e: except Exception as e:
log_message(f"ERROR: {e}") error(str(e))
+1 -7
View File
@@ -1,10 +1,4 @@
import sys import sys
from time import strftime
def log_message(message: str) -> None:
timestamp = strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def log_exception(func): def log_exception(func):
@@ -18,7 +12,7 @@ def log_exception(func):
try: try:
func(*args, **kwargs) func(*args, **kwargs)
except Exception as e: except Exception as e:
log_message(f'Error during execution of "{func.__name__}()": {str(e)}') error(f'exception during "{func.__name__}()": {str(e)}')
return wrapper return wrapper