From 1c707d3a169471ebe19c43a31c6287d34f2a0247 Mon Sep 17 00:00:00 2001 From: 2 * r + 2 * t <61896496+soramanew@users.noreply.github.com> Date: Sat, 13 Jun 2026 02:49:32 +1000 Subject: [PATCH] refactor: make resizer use new loggers --- src/caelestia/subcommands/resizer.py | 90 +++++++++++++--------------- src/caelestia/utils/io.py | 8 +-- 2 files changed, 44 insertions(+), 54 deletions(-) diff --git a/src/caelestia/subcommands/resizer.py b/src/caelestia/subcommands/resizer.py index 01c25988..5edf3a35 100644 --- a/src/caelestia/subcommands/resizer.py +++ b/src/caelestia/subcommands/resizer.py @@ -7,7 +7,7 @@ from pathlib import Path from typing import Any, Dict, Optional from caelestia.utils import hypr -from caelestia.utils.io import log_message +from caelestia.utils.io import error, fatal, info, log, warn from caelestia.utils.paths import get_config @@ -68,7 +68,7 @@ class Command: ) return rules except KeyError: - log_message("ERROR: invalid config") + warn("invalid config, falling back to default rules") except FileNotFoundError: pass @@ -188,12 +188,10 @@ class Command: command2 = self._make_move_cmd(int(move_x), int(move_y), address) hypr.batch(command1, command2) - log_message( - f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})" - ) + info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})") except Exception as e: - log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") + error(f"failed to apply PiP action to window 0x{window_id}: {e}") def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: dispatch_commands = [] @@ -214,10 +212,10 @@ class Command: try: hypr.batch(*dispatch_commands) - log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") + info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") return True except Exception as e: - log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") + error(f"failed to apply window actions for window 0x{window_id}: {e}") return False def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: @@ -236,7 +234,7 @@ class Command: if re.search(rule.name, window_title): return rule except re.error: - log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") + warn(f"invalid regex pattern in rule '{rule.name}'") return None @@ -258,7 +256,7 @@ class Command: window_id = window_id.lstrip(">") if not all(c in "0123456789abcdefABCDEF" for c in window_id): - log_message(f"ERROR: Invalid window ID format: {window_id}") + warn(f"invalid window ID format: {window_id}") return window_info = self._get_window_info(window_id) @@ -268,19 +266,19 @@ class Command: window_title = window_info.get("title", "") initial_title = window_info.get("initialTitle", "") - log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") + log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") rule = self._match_window_rule(window_title, initial_title) if rule: if self._is_rate_limited(window_id): - log_message(f"Rate limited: skipping window 0x{window_id}") + log(f"Rate limited: skipping window 0x{window_id}") return - log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") + info(f"Matched rule '{rule.name}' for window 0x{window_id}") self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) except (IndexError, ValueError) as e: - log_message(f"ERROR: Failed to parse window title event: {e}") + warn(f"failed to parse window title event: {e}") def _handle_open_event(self, event: str) -> None: try: @@ -296,22 +294,22 @@ class Command: window_id = window_id.lstrip(">") if not all(c in "0123456789abcdefABCDEF" for c in window_id): - log_message(f"ERROR: Invalid window ID format: {window_id}") + warn(f"invalid window ID format: {window_id}") return - log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") + log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") rule = self._match_window_rule(title, title) if rule: if self._is_rate_limited(window_id): - log_message(f"Rate limited: skipping window 0x{window_id}") + log(f"Rate limited: skipping window 0x{window_id}") return - log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") + info(f"Matched rule '{rule.name}' for new window 0x{window_id}") self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) except (IndexError, ValueError) as e: - log_message(f"ERROR: Failed to parse window open event: {e}") + warn(f"failed to parse window open event: {e}") def run(self) -> None: if self.args.daemon: @@ -324,7 +322,7 @@ class Command: ): self._run_active_mode() else: - print( + info( "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" ) @@ -333,28 +331,27 @@ class Command: try: active_window_result = hypr.message("activewindow") if not isinstance(active_window_result, dict) or not active_window_result.get("address"): - print("ERROR: No active window found") + error("no active window found") return address = active_window_result.get("address", "") if not isinstance(address, str) or not address.startswith("0x"): - print("ERROR: Invalid window address") + error("invalid window address") return window_id = address[2:] # Remove "0x" prefix window_title = active_window_result.get("title", "") if not active_window_result.get("floating", False): - print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") - print("Try making it floating first with: hyprctl dispatch togglefloating") + warn(f"window '{window_title}' is not floating; PiP only works on floating windows.") return - print(f"Applying PIP to active window: '{window_title}'") + info(f"Applying PiP to active window: '{window_title}'") self._apply_pip_action(window_id) - print("PIP applied successfully") + info("PiP applied successfully") except Exception as e: - print(f"ERROR: Failed to apply PIP to active window: {e}") + error(f"failed to apply PiP to active window: {e}") def _run_active_mode(self) -> None: try: @@ -371,10 +368,10 @@ class Command: matching_windows = self._find_matching_windows(temp_rule) if not matching_windows: - print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") + warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") return - print(f"Found {len(matching_windows)} matching window(s)") + info(f"Found {len(matching_windows)} matching window(s)") # Apply rule to all matching windows success_count = 0 @@ -382,41 +379,41 @@ class Command: window_id = window["address"][2:] # Remove "0x" prefix window_title = window.get("title", "") - print(f"Applying rule to window 0x{window_id}: '{window_title}'") + info(f"Applying rule to window 0x{window_id}: '{window_title}'") success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) if success: success_count += 1 - print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") + info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") except Exception as e: - print(f"ERROR: Failed to apply rule: {e}") + error(f"failed to apply rule: {e}") def _apply_to_active_window(self, temp_rule: WindowRule) -> None: """Apply rule only to the currently active window""" try: active_window_result = hypr.message("activewindow") if not isinstance(active_window_result, dict) or not active_window_result.get("address"): - print("ERROR: No active window found") + error("no active window found") return window_title = active_window_result.get("title", "") address = active_window_result.get("address", "") if not isinstance(address, str) or not address.startswith("0x"): - print("ERROR: Invalid window address") + error("invalid window address") return window_id = address[2:] # Remove "0x" prefix - print(f"Applying rule to active window 0x{window_id}: '{window_title}'") + info(f"Applying rule to active window 0x{window_id}: '{window_title}'") success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) if success: - print("Rule applied successfully") + info("Rule applied successfully") else: - print("Failed to apply rule") + error("failed to apply rule") except Exception as e: - print(f"ERROR: Failed to apply rule to active window: {e}") + error(f"failed to apply rule to active window: {e}") def _find_matching_windows(self, temp_rule: WindowRule) -> list: """Find all windows that match the given rule pattern""" @@ -445,7 +442,7 @@ class Command: try: matches = bool(re.search(temp_rule.name, window_title)) except re.error: - print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") + warn(f"invalid regex pattern '{temp_rule.name}'") return [] if matches: @@ -454,23 +451,22 @@ class Command: return matching_windows except Exception as e: - print(f"ERROR: Failed to find matching windows: {e}") + error(f"failed to find matching windows: {e}") return [] def _run_daemon(self) -> None: - log_message("Hyprland window resizer started") - log_message(f"Loaded {len(self.window_rules)} window rules") + info("Hyprland window resizer started") + info(f"Loaded {len(self.window_rules)} window rules") socket_path = Path(hypr.socket2_path) if not socket_path.exists(): - log_message(f"ERROR: Hyprland socket not found at {socket_path}") - return + fatal(f"Hyprland socket not found at {socket_path}") try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(hypr.socket2_path) - log_message("Connected to Hyprland socket, listening for events...") + info("Connected to Hyprland socket, listening for events...") while True: data = sock.recv(4096).decode() @@ -480,6 +476,6 @@ class Command: self._handle_window_event(line) except KeyboardInterrupt: - log_message("Resizer daemon stopped") + info("Resizer daemon stopped") except Exception as e: - log_message(f"ERROR: {e}") + error(str(e)) diff --git a/src/caelestia/utils/io.py b/src/caelestia/utils/io.py index 0786119a..612d954f 100644 --- a/src/caelestia/utils/io.py +++ b/src/caelestia/utils/io.py @@ -1,10 +1,4 @@ import sys -from time import strftime - - -def log_message(message: str) -> None: - timestamp = strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}") def log_exception(func): @@ -18,7 +12,7 @@ def log_exception(func): try: func(*args, **kwargs) except Exception as e: - log_message(f'Error during execution of "{func.__name__}()": {str(e)}') + error(f'exception during "{func.__name__}()": {str(e)}') return wrapper