feat: window resizer command & daemon (#43)

* resizer: add window resizer daemon command

Implements a continuous window resizer daemon that automatically resizes
windows based on configurable rules. Features include:

- Listens to Hyprland socket events for real-time window detection
- Supports multiple match types: initial_title, title_contains, title_exact
- Configurable via CLI config file with fallback to sensible defaults
- Rate limiting to prevent excessive resize operations
- Window actions: float, center, and custom dimensions
- Integration with existing CLI structure

Usage: caelestia resizer --daemon

* refactor: replace pip daemon with integrated resizer functionality

## Summary
- Remove standalone pip daemon and integrate its functionality into the resizer
- Add regex matching, config support, and active mode to resizer
- Implement clean 'caelestia resizer pip' command for quick PiP operations
- Update keybinds to use new unified resizer command

## Why Replace the Old PiP Method?

### 1. Code Duplication
The old pip daemon duplicated window management logic that already existed in the resizer:
- Both daemons listened to Hyprland socket events
- Both had similar window detection and manipulation code
- Both needed rate limiting and error handling

### 2. Limited Functionality
The old pip daemon was restricted:
- Only worked with regex pattern matching for 'Picture in Picture' titles
- No configuration support for custom rules
- No way to apply PiP to arbitrary windows
- No integration with other window actions

### 3. Maintenance Overhead
Having two separate daemons created maintenance issues:
- Two different codebases to maintain and debug
- Potential conflicts when both daemons run simultaneously
- Inconsistent error handling and logging approaches

### 4. Review Feedback Implementation
The PR review specifically requested this consolidation:
- "This can actually probably replace the pip daemon entirely"
- "consider adding a regex match mode and pip action, then add that to the default rules"

## New Integrated Approach Benefits

### 1. Unified Window Management
- Single daemon handles all window operations (resize, float, center, pip)
- Consistent configuration format using camelCase
- Shared error handling and rate limiting

### 2. Enhanced PiP Functionality
- Works with any window title pattern (regex, contains, exact)
- Configurable through CLI config file
- Active mode: `caelestia resizer pip` for quick PiP on current window
- Better error messages and user guidance

### 3. Future-Proof Architecture
- Easy to add new window actions (e.g., minimize, maximize, workspace move)
- Extensible pattern matching (could add class-based matching)
- Single place to implement new Hyprland features

### 4. Improved User Experience
- Simpler command structure: `caelestia resizer pip` vs complex arguments
- Better error messages when windows aren't floating
- Consistent CLI interface across all window operations

## Implementation Details
- Added pip action to WindowRule system
- Integrated original pip calculation with minimum size constraints
- Added type safety improvements throughout
- Maintained backward compatibility for existing users
- Updated keybind: `bind = $kbWindowPip, exec, caelestia resizer pip`

* fix: unpack dispatch_commands list in hypr.batch call

- Fix 'sequence item 0: expected str instance, list found' error
- hypr.batch() expects individual string arguments, not a list
- Use *dispatch_commands to unpack the list properly

* fix: handle Hyprland event format with triple > separators

- Fix window ID parsing for events with >>> instead of >>
- Add .lstrip('>') to remove any leading > characters
- Support both >> and >>> formats for compatibility
- Fixes 'Invalid window ID format: >555ee935ba30' errors

* resizer: implement active mode for all matching windows

Active mode now searches through all open windows and applies the rule to
any that match the specified pattern, rather than just checking if the
currently active window matches. This allows for batch operations on
multiple windows with the same pattern.

Special case: using pattern "active" will still target only the currently
active window, allowing users to apply rules to just the focused window
when needed.

This addresses the latest review feedback requesting that active mode work
on any open window that matches the given pattern.

* parser: better resizer help

* completions: add for resizer

---------

Co-authored-by: 2 * r + 2 * t <61896496+soramanew@users.noreply.github.com>
This commit is contained in:
Batuhan Edgüer
2025-08-18 10:39:35 +03:00
committed by GitHub
parent 3e19fd6919
commit c72223a7e6
4 changed files with 489 additions and 55 deletions
+20 -5
View File
@@ -1,6 +1,6 @@
import argparse
from caelestia.subcommands import clipboard, emoji, pip, record, scheme, screenshot, shell, toggle, wallpaper
from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper
from caelestia.utils.paths import wallpapers_dir
from caelestia.utils.scheme import get_scheme_names, scheme_variants
from caelestia.utils.wallpaper import get_wallpaper
@@ -106,9 +106,24 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace):
help="do not automatically change the scheme mode based on wallpaper colour",
)
# Create parser for pip opts
pip_parser = command_parser.add_parser("pip", help="picture in picture utilities")
pip_parser.set_defaults(cls=pip.Command)
pip_parser.add_argument("-d", "--daemon", action="store_true", help="start the daemon")
# Create parser for resizer opts
resizer_parser = command_parser.add_parser("resizer", help="window resizer daemon")
resizer_parser.set_defaults(cls=resizer.Command)
resizer_parser.add_argument("-d", "--daemon", action="store_true", help="start the resizer daemon")
resizer_parser.add_argument(
"pattern",
nargs="?",
help="pattern to match against windows ('active' for current window only, 'pip' for quick pip mode)",
)
resizer_parser.add_argument(
"match_type",
nargs="?",
metavar="match_type",
choices=["titleContains", "titleExact", "titleRegex", "initialTitle"],
help="type of pattern matching (titleContains,titleExact,titleRegex,initialTitle)",
)
resizer_parser.add_argument("width", nargs="?", help="width to resize to")
resizer_parser.add_argument("height", nargs="?", help="height to resize to")
resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)")
return parser, parser.parse_args()
-46
View File
@@ -1,46 +0,0 @@
import re
import socket
from argparse import Namespace
from caelestia.utils import hypr
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.daemon:
self.daemon()
else:
win = hypr.message("activewindow")
if win["floating"]:
self.handle_window(win["address"], win["workspace"]["name"])
def handle_window(self, address: str, ws: str) -> None:
mon_id = next(w for w in hypr.message("workspaces") if w["name"] == ws)["monitorID"]
mon = next(m for m in hypr.message("monitors") if m["id"] == mon_id)
width, height = next(c for c in hypr.message("clients") if c["address"] == address)["size"]
scale_factor = mon["height"] / 4 / height
scaled_win_size = f"{int(width * scale_factor)} {int(height * scale_factor)}"
off = min(mon["width"], mon["height"]) * 0.03
move_to = f"{int(mon['x']) + int(mon['width'] - off - width * scale_factor)} {int(mon['y']) + int(mon['height'] - off - height * scale_factor)}"
hypr.batch(
f"dispatch resizewindowpixel exact {scaled_win_size},address:{address}",
f"dispatch movewindowpixel exact {move_to},address:{address}",
)
def daemon(self) -> None:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path)
while True:
data = sock.recv(4096).decode()
if data.startswith("openwindow>>"):
address, ws, cls, title = data[12:].split(",")
if re.match(r"^[Pp]icture(-| )in(-| )[Pp]icture$", title):
self.handle_window(f"0x{address}", ws)
+463
View File
@@ -0,0 +1,463 @@
import json
import re
import socket
import time
from argparse import Namespace
from pathlib import Path
from typing import Any, Dict, Optional
from caelestia.utils import hypr
from caelestia.utils.paths import user_config_path
class WindowRule:
def __init__(self, name: str, match_type: str, width: str, height: str, actions: list[str]):
self.name = name
self.match_type = match_type
self.width = width
self.height = height
self.actions = actions
class Command:
def __init__(self, args: Namespace) -> None:
self.args = args
self.timeout_tracker: dict[str, float] = {}
self.window_rules = self._load_window_rules()
def _load_window_rules(self) -> list[WindowRule]:
default_rules = [
WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]),
WindowRule("Sign in - Google Accounts", "titleContains", "35%", "65%", ["float", "center"]),
WindowRule("oauth", "titleContains", "30%", "60%", ["float", "center"]),
WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]),
]
try:
config = json.loads(user_config_path.read_text())
if "resizer" in config and "rules" in config["resizer"]:
rules = []
for rule_config in config["resizer"]["rules"]:
rules.append(
WindowRule(
rule_config["name"],
rule_config["matchType"],
rule_config["width"],
rule_config["height"],
rule_config["actions"],
)
)
return rules
except (json.JSONDecodeError, KeyError):
self._log_message("ERROR: invalid config")
except FileNotFoundError:
pass
return default_rules
def _log_message(self, message: str) -> None:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def _is_rate_limited(self, key: str) -> bool:
current_time = time.time()
last_time = self.timeout_tracker.get(key, 0)
if current_time < last_time + 1:
return True
self.timeout_tracker[key] = current_time
return False
def _get_window_info(self, window_id: str) -> Optional[Dict[str, Any]]:
try:
clients = hypr.message("clients")
if isinstance(clients, list):
for client in clients:
if isinstance(client, dict) and client.get("address") == f"0x{window_id}":
return client
except Exception:
pass
return None
def _apply_pip_action(self, window_id: str) -> None:
try:
address = f"0x{window_id}"
clients_result = hypr.message("clients")
if not isinstance(clients_result, list):
return
window = None
for c in clients_result:
if isinstance(c, dict) and c.get("address") == address:
window = c
break
if not window or not isinstance(window, dict) or not window.get("floating", False):
return
workspaces_result = hypr.message("workspaces")
if not isinstance(workspaces_result, list):
return
workspace_info = window.get("workspace")
if not isinstance(workspace_info, dict):
return
workspace_name = workspace_info.get("name")
workspace = None
for w in workspaces_result:
if isinstance(w, dict) and w.get("name") == workspace_name:
workspace = w
break
if not workspace or not isinstance(workspace, dict):
return
monitors_result = hypr.message("monitors")
if not isinstance(monitors_result, list):
return
monitor_id = workspace.get("monitorID")
monitor = None
for m in monitors_result:
if isinstance(m, dict) and m.get("id") == monitor_id:
monitor = m
break
if not monitor or not isinstance(monitor, dict):
return
window_size = window.get("size")
if not isinstance(window_size, list) or len(window_size) < 2:
return
width, height = window_size[0], window_size[1]
if not isinstance(width, (int, float)) or not isinstance(height, (int, float)):
return
monitor_height = monitor.get("height")
monitor_width = monitor.get("width")
monitor_x = monitor.get("x")
monitor_y = monitor.get("y")
if not all(isinstance(x, (int, float)) for x in [monitor_height, monitor_width, monitor_x, monitor_y]):
return
scale_factor = monitor_height / 4 / height
scaled_width = int(width * scale_factor)
scaled_height = int(height * scale_factor)
# Ensure minimum reasonable size
min_width = 200
min_height = 150
scaled_width = max(scaled_width, min_width)
scaled_height = max(scaled_height, min_height)
# Use offset to ensure window stays on screen with some margin
offset = min(monitor_width, monitor_height) * 0.03
# Position in bottom-right corner with offset
move_x = monitor_x + monitor_width - scaled_width - offset
move_y = monitor_y + monitor_height - scaled_height - offset
command1 = f"dispatch resizewindowpixel exact {scaled_width} {scaled_height},address:{address}"
command2 = f"dispatch movewindowpixel exact {int(move_x)} {int(move_y)},address:{address}"
hypr.batch(command1, command2)
self._log_message(
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
)
except Exception as e:
self._log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}")
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
dispatch_commands = []
if "float" in actions:
window_info = self._get_window_info(window_id)
if window_info and not window_info.get("floating", False):
dispatch_commands.append(f"dispatch togglefloating address:0x{window_id}")
if "pip" in actions:
self._apply_pip_action(window_id)
return True
dispatch_commands.append(f"dispatch resizewindowpixel exact {width} {height},address:0x{window_id}")
if "center" in actions:
dispatch_commands.append("dispatch centerwindow")
try:
hypr.batch(*dispatch_commands)
self._log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
return True
except Exception as e:
self._log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}")
return False
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
for rule in self.window_rules:
if rule.match_type == "initialTitle":
if initial_title == rule.name:
return rule
elif rule.match_type == "titleContains":
if rule.name in window_title:
return rule
elif rule.match_type == "titleExact":
if window_title == rule.name:
return rule
elif rule.match_type == "titleRegex":
try:
if re.search(rule.name, window_title):
return rule
except re.error:
self._log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'")
return None
def _handle_window_event(self, event: str) -> None:
if event.startswith("windowtitle"):
self._handle_title_event(event)
elif event.startswith("openwindow"):
self._handle_open_event(event)
def _handle_title_event(self, event: str) -> None:
try:
# Handle both >> and >>> separators (different Hyprland versions)
if ">>>" in event:
window_id = event.split(">>>")[1].split(",")[0]
else:
window_id = event.split(">>")[1].split(",")[0]
# Remove any leading > characters
window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
self._log_message(f"ERROR: Invalid window ID format: {window_id}")
return
window_info = self._get_window_info(window_id)
if not window_info:
return
window_title = window_info.get("title", "")
initial_title = window_info.get("initialTitle", "")
self._log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
rule = self._match_window_rule(window_title, initial_title)
if rule:
if self._is_rate_limited(window_id):
self._log_message(f"Rate limited: skipping window 0x{window_id}")
return
self._log_message(f"Matched rule '{rule.name}' for window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e:
self._log_message(f"ERROR: Failed to parse window title event: {e}")
def _handle_open_event(self, event: str) -> None:
try:
# Handle both >> and >>> separators
if "openwindow>>>" in event:
data = event[13:] # Remove "openwindow>>>"
else:
data = event[12:] # Remove "openwindow>>"
window_id, workspace, window_class, title = data.split(",", 3)
# Remove any leading > characters
window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
self._log_message(f"ERROR: Invalid window ID format: {window_id}")
return
self._log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
rule = self._match_window_rule(title, title)
if rule:
if self._is_rate_limited(window_id):
self._log_message(f"Rate limited: skipping window 0x{window_id}")
return
self._log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e:
self._log_message(f"ERROR: Failed to parse window open event: {e}")
def run(self) -> None:
if self.args.daemon:
self._run_daemon()
elif hasattr(self.args, "pattern") and self.args.pattern == "pip":
self._run_pip_mode()
elif all(
hasattr(self.args, attr) and getattr(self.args, attr)
for attr in ["pattern", "match_type", "width", "height", "actions"]
):
self._run_active_mode()
else:
print(
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
)
def _run_pip_mode(self) -> None:
"""Quick pip mode - applies pip action to the active window if it's floating"""
try:
active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found")
return
address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address")
return
window_id = address[2:] # Remove "0x" prefix
window_title = active_window_result.get("title", "")
if not active_window_result.get("floating", False):
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.")
print("Try making it floating first with: hyprctl dispatch togglefloating")
return
print(f"Applying PIP to active window: '{window_title}'")
self._apply_pip_action(window_id)
print("PIP applied successfully")
except Exception as e:
print(f"ERROR: Failed to apply PIP to active window: {e}")
def _run_active_mode(self) -> None:
try:
# Create a temporary rule from command line arguments
actions = self.args.actions.split(",") if self.args.actions else []
temp_rule = WindowRule(self.args.pattern, self.args.match_type, self.args.width, self.args.height, actions)
# Special case: "active" pattern means only target the currently active window
if temp_rule.name.lower() == "active":
self._apply_to_active_window(temp_rule)
return
# Find all windows that match the pattern
matching_windows = self._find_matching_windows(temp_rule)
if not matching_windows:
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
return
print(f"Found {len(matching_windows)} matching window(s)")
# Apply rule to all matching windows
success_count = 0
for window in matching_windows:
window_id = window["address"][2:] # Remove "0x" prefix
window_title = window.get("title", "")
print(f"Applying rule to window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success:
success_count += 1
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
except Exception as e:
print(f"ERROR: Failed to apply rule: {e}")
def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
"""Apply rule only to the currently active window"""
try:
active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found")
return
window_title = active_window_result.get("title", "")
address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address")
return
window_id = address[2:] # Remove "0x" prefix
print(f"Applying rule to active window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success:
print("Rule applied successfully")
else:
print("Failed to apply rule")
except Exception as e:
print(f"ERROR: Failed to apply rule to active window: {e}")
def _find_matching_windows(self, temp_rule: WindowRule) -> list:
"""Find all windows that match the given rule pattern"""
try:
clients_result = hypr.message("clients")
if not isinstance(clients_result, list):
return []
matching_windows = []
for window in clients_result:
if not isinstance(window, dict):
continue
window_title = window.get("title", "")
initial_title = window.get("initialTitle", "")
# Check if window matches the pattern
matches = False
if temp_rule.match_type == "initialTitle":
matches = initial_title == temp_rule.name
elif temp_rule.match_type == "titleContains":
matches = temp_rule.name in window_title
elif temp_rule.match_type == "titleExact":
matches = window_title == temp_rule.name
elif temp_rule.match_type == "titleRegex":
try:
matches = bool(re.search(temp_rule.name, window_title))
except re.error:
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'")
return []
if matches:
matching_windows.append(window)
return matching_windows
except Exception as e:
print(f"ERROR: Failed to find matching windows: {e}")
return []
def _run_daemon(self) -> None:
self._log_message("Hyprland window resizer started")
self._log_message(f"Loaded {len(self.window_rules)} window rules")
socket_path = Path(hypr.socket2_path)
if not socket_path.exists():
self._log_message(f"ERROR: Hyprland socket not found at {socket_path}")
return
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path)
self._log_message("Connected to Hyprland socket, listening for events...")
while True:
data = sock.recv(4096).decode()
if data:
for line in data.strip().split("\n"):
if line:
self._handle_window_event(line)
except KeyboardInterrupt:
self._log_message("Resizer daemon stopped")
except Exception as e:
self._log_message(f"ERROR: {e}")