mirror of
https://github.com/caelestia-dots/cli.git
synced 2026-06-05 14:59:29 -05:00
466 lines
18 KiB
Python
466 lines
18 KiB
Python
import json
|
|
import re
|
|
import socket
|
|
import time
|
|
from argparse import Namespace
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
from caelestia.utils import hypr
|
|
from caelestia.utils.logging import log_message
|
|
from caelestia.utils.paths import user_config_path
|
|
|
|
|
|
class WindowRule:
|
|
def __init__(self, name: str, match_type: str, width: str, height: str, actions: list[str]):
|
|
self.name = name
|
|
self.match_type = match_type
|
|
self.width = width
|
|
self.height = height
|
|
self.actions = actions
|
|
|
|
|
|
class Command:
|
|
def __init__(self, args: Namespace) -> None:
|
|
self.args = args
|
|
self.timeout_tracker: dict[str, float] = {}
|
|
self.window_rules = self._load_window_rules()
|
|
|
|
def _load_window_rules(self) -> list[WindowRule]:
|
|
default_rules = [
|
|
WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]),
|
|
WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]),
|
|
]
|
|
|
|
try:
|
|
config = json.loads(user_config_path.read_text())
|
|
if "resizer" in config and "rules" in config["resizer"]:
|
|
rules = []
|
|
for rule_config in config["resizer"]["rules"]:
|
|
rules.append(
|
|
WindowRule(
|
|
rule_config["name"],
|
|
rule_config["matchType"],
|
|
rule_config["width"],
|
|
rule_config["height"],
|
|
rule_config["actions"],
|
|
)
|
|
)
|
|
return rules
|
|
except (json.JSONDecodeError, KeyError):
|
|
log_message("ERROR: invalid config")
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
return default_rules
|
|
|
|
def _is_rate_limited(self, key: str) -> bool:
|
|
current_time = time.time()
|
|
last_time = self.timeout_tracker.get(key, 0)
|
|
|
|
if current_time < last_time + 1:
|
|
return True
|
|
|
|
self.timeout_tracker[key] = current_time
|
|
return False
|
|
|
|
def _get_window_info(self, window_id: str) -> Optional[Dict[str, Any]]:
|
|
try:
|
|
clients = hypr.message("clients")
|
|
if isinstance(clients, list):
|
|
for client in clients:
|
|
if isinstance(client, dict) and client.get("address") == f"0x{window_id}":
|
|
return client
|
|
except Exception:
|
|
pass
|
|
|
|
return None
|
|
|
|
def _apply_pip_action(self, window_id: str) -> None:
|
|
try:
|
|
address = f"0x{window_id}"
|
|
clients_result = hypr.message("clients")
|
|
if not isinstance(clients_result, list):
|
|
return
|
|
|
|
window = None
|
|
for c in clients_result:
|
|
if isinstance(c, dict) and c.get("address") == address:
|
|
window = c
|
|
break
|
|
|
|
if not window or not isinstance(window, dict) or not window.get("floating", False):
|
|
return
|
|
|
|
workspaces_result = hypr.message("workspaces")
|
|
if not isinstance(workspaces_result, list):
|
|
return
|
|
|
|
workspace_info = window.get("workspace")
|
|
if not isinstance(workspace_info, dict):
|
|
return
|
|
|
|
workspace_name = workspace_info.get("name")
|
|
workspace = None
|
|
for w in workspaces_result:
|
|
if isinstance(w, dict) and w.get("name") == workspace_name:
|
|
workspace = w
|
|
break
|
|
|
|
if not workspace or not isinstance(workspace, dict):
|
|
return
|
|
|
|
monitors_result = hypr.message("monitors")
|
|
if not isinstance(monitors_result, list):
|
|
return
|
|
|
|
monitor_id = workspace.get("monitorID")
|
|
monitor = None
|
|
for m in monitors_result:
|
|
if isinstance(m, dict) and m.get("id") == monitor_id:
|
|
monitor = m
|
|
break
|
|
|
|
if not monitor or not isinstance(monitor, dict):
|
|
return
|
|
|
|
window_size = window.get("size")
|
|
if not isinstance(window_size, list) or len(window_size) < 2:
|
|
return
|
|
|
|
width, height = window_size[0], window_size[1]
|
|
if not isinstance(width, (int, float)) or not isinstance(height, (int, float)):
|
|
return
|
|
|
|
monitor_height = monitor.get("height")
|
|
monitor_width = monitor.get("width")
|
|
monitor_scale = monitor.get("scale")
|
|
monitor_x = monitor.get("x")
|
|
monitor_y = monitor.get("y")
|
|
|
|
if not all(
|
|
isinstance(x, (int, float))
|
|
for x in [monitor_height, monitor_width, monitor_scale, monitor_x, monitor_y]
|
|
):
|
|
return
|
|
|
|
monitor_height = monitor_height / monitor_scale
|
|
monitor_width = monitor_width / monitor_scale
|
|
|
|
scale_factor = monitor_height / 4 / height
|
|
scaled_width = int(width * scale_factor)
|
|
scaled_height = int(height * scale_factor)
|
|
|
|
# Ensure minimum reasonable size
|
|
min_width = 200
|
|
min_height = 150
|
|
scaled_width = max(scaled_width, min_width)
|
|
scaled_height = max(scaled_height, min_height)
|
|
|
|
# Use offset to ensure window stays on screen with some margin
|
|
offset = min(monitor_width, monitor_height) * 0.03
|
|
|
|
# Position in bottom-right corner with offset
|
|
move_x = monitor_x + monitor_width - scaled_width - offset
|
|
move_y = monitor_y + monitor_height - scaled_height - offset
|
|
|
|
command1 = f"dispatch resizewindowpixel exact {scaled_width} {scaled_height},address:{address}"
|
|
command2 = f"dispatch movewindowpixel exact {int(move_x)} {int(move_y)},address:{address}"
|
|
hypr.batch(command1, command2)
|
|
|
|
log_message(
|
|
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
|
|
)
|
|
|
|
except Exception as e:
|
|
log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}")
|
|
|
|
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
|
|
dispatch_commands = []
|
|
|
|
if "float" in actions:
|
|
window_info = self._get_window_info(window_id)
|
|
if window_info and not window_info.get("floating", False):
|
|
dispatch_commands.append(f"dispatch togglefloating address:0x{window_id}")
|
|
|
|
if "pip" in actions:
|
|
self._apply_pip_action(window_id)
|
|
return True
|
|
|
|
dispatch_commands.append(f"dispatch resizewindowpixel exact {width} {height},address:0x{window_id}")
|
|
|
|
if "center" in actions:
|
|
dispatch_commands.append("dispatch centerwindow")
|
|
|
|
try:
|
|
hypr.batch(*dispatch_commands)
|
|
log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
|
|
return True
|
|
except Exception as e:
|
|
log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}")
|
|
return False
|
|
|
|
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
|
|
for rule in self.window_rules:
|
|
if rule.match_type == "initialTitle":
|
|
if initial_title == rule.name:
|
|
return rule
|
|
elif rule.match_type == "titleContains":
|
|
if rule.name in window_title:
|
|
return rule
|
|
elif rule.match_type == "titleExact":
|
|
if window_title == rule.name:
|
|
return rule
|
|
elif rule.match_type == "titleRegex":
|
|
try:
|
|
if re.search(rule.name, window_title):
|
|
return rule
|
|
except re.error:
|
|
log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'")
|
|
|
|
return None
|
|
|
|
def _handle_window_event(self, event: str) -> None:
|
|
if event.startswith("windowtitle"):
|
|
self._handle_title_event(event)
|
|
elif event.startswith("openwindow"):
|
|
self._handle_open_event(event)
|
|
|
|
def _handle_title_event(self, event: str) -> None:
|
|
try:
|
|
# Handle both >> and >>> separators (different Hyprland versions)
|
|
if ">>>" in event:
|
|
window_id = event.split(">>>")[1].split(",")[0]
|
|
else:
|
|
window_id = event.split(">>")[1].split(",")[0]
|
|
|
|
# Remove any leading > characters
|
|
window_id = window_id.lstrip(">")
|
|
|
|
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
|
|
log_message(f"ERROR: Invalid window ID format: {window_id}")
|
|
return
|
|
|
|
window_info = self._get_window_info(window_id)
|
|
if not window_info:
|
|
return
|
|
|
|
window_title = window_info.get("title", "")
|
|
initial_title = window_info.get("initialTitle", "")
|
|
|
|
log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
|
|
|
|
rule = self._match_window_rule(window_title, initial_title)
|
|
if rule:
|
|
if self._is_rate_limited(window_id):
|
|
log_message(f"Rate limited: skipping window 0x{window_id}")
|
|
return
|
|
|
|
log_message(f"Matched rule '{rule.name}' for window 0x{window_id}")
|
|
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
|
|
|
|
except (IndexError, ValueError) as e:
|
|
log_message(f"ERROR: Failed to parse window title event: {e}")
|
|
|
|
def _handle_open_event(self, event: str) -> None:
|
|
try:
|
|
# Handle both >> and >>> separators
|
|
if "openwindow>>>" in event:
|
|
data = event[13:] # Remove "openwindow>>>"
|
|
else:
|
|
data = event[12:] # Remove "openwindow>>"
|
|
|
|
window_id, workspace, window_class, title = data.split(",", 3)
|
|
|
|
# Remove any leading > characters
|
|
window_id = window_id.lstrip(">")
|
|
|
|
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
|
|
log_message(f"ERROR: Invalid window ID format: {window_id}")
|
|
return
|
|
|
|
log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
|
|
|
|
rule = self._match_window_rule(title, title)
|
|
if rule:
|
|
if self._is_rate_limited(window_id):
|
|
log_message(f"Rate limited: skipping window 0x{window_id}")
|
|
return
|
|
|
|
log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}")
|
|
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
|
|
|
|
except (IndexError, ValueError) as e:
|
|
log_message(f"ERROR: Failed to parse window open event: {e}")
|
|
|
|
def run(self) -> None:
|
|
if self.args.daemon:
|
|
self._run_daemon()
|
|
elif hasattr(self.args, "pattern") and self.args.pattern == "pip":
|
|
self._run_pip_mode()
|
|
elif all(
|
|
hasattr(self.args, attr) and getattr(self.args, attr)
|
|
for attr in ["pattern", "match_type", "width", "height", "actions"]
|
|
):
|
|
self._run_active_mode()
|
|
else:
|
|
print(
|
|
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
|
|
)
|
|
|
|
def _run_pip_mode(self) -> None:
|
|
"""Quick pip mode - applies pip action to the active window if it's floating"""
|
|
try:
|
|
active_window_result = hypr.message("activewindow")
|
|
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
|
|
print("ERROR: No active window found")
|
|
return
|
|
|
|
address = active_window_result.get("address", "")
|
|
if not isinstance(address, str) or not address.startswith("0x"):
|
|
print("ERROR: Invalid window address")
|
|
return
|
|
|
|
window_id = address[2:] # Remove "0x" prefix
|
|
window_title = active_window_result.get("title", "")
|
|
|
|
if not active_window_result.get("floating", False):
|
|
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.")
|
|
print("Try making it floating first with: hyprctl dispatch togglefloating")
|
|
return
|
|
|
|
print(f"Applying PIP to active window: '{window_title}'")
|
|
self._apply_pip_action(window_id)
|
|
print("PIP applied successfully")
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to apply PIP to active window: {e}")
|
|
|
|
def _run_active_mode(self) -> None:
|
|
try:
|
|
# Create a temporary rule from command line arguments
|
|
actions = self.args.actions.split(",") if self.args.actions else []
|
|
temp_rule = WindowRule(self.args.pattern, self.args.match_type, self.args.width, self.args.height, actions)
|
|
|
|
# Special case: "active" pattern means only target the currently active window
|
|
if temp_rule.name.lower() == "active":
|
|
self._apply_to_active_window(temp_rule)
|
|
return
|
|
|
|
# Find all windows that match the pattern
|
|
matching_windows = self._find_matching_windows(temp_rule)
|
|
|
|
if not matching_windows:
|
|
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
|
|
return
|
|
|
|
print(f"Found {len(matching_windows)} matching window(s)")
|
|
|
|
# Apply rule to all matching windows
|
|
success_count = 0
|
|
for window in matching_windows:
|
|
window_id = window["address"][2:] # Remove "0x" prefix
|
|
window_title = window.get("title", "")
|
|
|
|
print(f"Applying rule to window 0x{window_id}: '{window_title}'")
|
|
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
|
|
if success:
|
|
success_count += 1
|
|
|
|
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to apply rule: {e}")
|
|
|
|
def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
|
|
"""Apply rule only to the currently active window"""
|
|
try:
|
|
active_window_result = hypr.message("activewindow")
|
|
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
|
|
print("ERROR: No active window found")
|
|
return
|
|
|
|
window_title = active_window_result.get("title", "")
|
|
address = active_window_result.get("address", "")
|
|
if not isinstance(address, str) or not address.startswith("0x"):
|
|
print("ERROR: Invalid window address")
|
|
return
|
|
|
|
window_id = address[2:] # Remove "0x" prefix
|
|
|
|
print(f"Applying rule to active window 0x{window_id}: '{window_title}'")
|
|
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
|
|
if success:
|
|
print("Rule applied successfully")
|
|
else:
|
|
print("Failed to apply rule")
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to apply rule to active window: {e}")
|
|
|
|
def _find_matching_windows(self, temp_rule: WindowRule) -> list:
|
|
"""Find all windows that match the given rule pattern"""
|
|
try:
|
|
clients_result = hypr.message("clients")
|
|
if not isinstance(clients_result, list):
|
|
return []
|
|
|
|
matching_windows = []
|
|
for window in clients_result:
|
|
if not isinstance(window, dict):
|
|
continue
|
|
|
|
window_title = window.get("title", "")
|
|
initial_title = window.get("initialTitle", "")
|
|
|
|
# Check if window matches the pattern
|
|
matches = False
|
|
if temp_rule.match_type == "initialTitle":
|
|
matches = initial_title == temp_rule.name
|
|
elif temp_rule.match_type == "titleContains":
|
|
matches = temp_rule.name in window_title
|
|
elif temp_rule.match_type == "titleExact":
|
|
matches = window_title == temp_rule.name
|
|
elif temp_rule.match_type == "titleRegex":
|
|
try:
|
|
matches = bool(re.search(temp_rule.name, window_title))
|
|
except re.error:
|
|
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'")
|
|
return []
|
|
|
|
if matches:
|
|
matching_windows.append(window)
|
|
|
|
return matching_windows
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to find matching windows: {e}")
|
|
return []
|
|
|
|
def _run_daemon(self) -> None:
|
|
log_message("Hyprland window resizer started")
|
|
log_message(f"Loaded {len(self.window_rules)} window rules")
|
|
|
|
socket_path = Path(hypr.socket2_path)
|
|
if not socket_path.exists():
|
|
log_message(f"ERROR: Hyprland socket not found at {socket_path}")
|
|
return
|
|
|
|
try:
|
|
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
|
|
sock.connect(hypr.socket2_path)
|
|
|
|
log_message("Connected to Hyprland socket, listening for events...")
|
|
|
|
while True:
|
|
data = sock.recv(4096).decode()
|
|
if data:
|
|
for line in data.strip().split("\n"):
|
|
if line:
|
|
self._handle_window_event(line)
|
|
|
|
except KeyboardInterrupt:
|
|
log_message("Resizer daemon stopped")
|
|
except Exception as e:
|
|
log_message(f"ERROR: {e}")
|