Merge pull request #123 from caelestia-dots/feat/add-install-cmd

feat: add install command
This commit is contained in:
2 * r + 2 * t
2026-06-16 01:33:40 +10:00
committed by GitHub
18 changed files with 1117 additions and 152 deletions
+9
View File
@@ -0,0 +1,9 @@
#!/usr/bin/env sh
export HOME=/tmp/install-test
export XDG_CONFIG_HOME=$HOME/.config
export XDG_DATA_HOME=$HOME/.local/share
export XDG_STATE_HOME=$HOME/.local/state
export XDG_CACHE_HOME=$HOME/.cache
"$@"
+8 -1
View File
@@ -1,7 +1,7 @@
set -l seen '__fish_seen_subcommand_from' set -l seen '__fish_seen_subcommand_from'
set -l has_opt '__fish_contains_opt' set -l has_opt '__fish_contains_opt'
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install
set -l not_seen "not $seen $commands" set -l not_seen "not $seen $commands"
# Disable file completions # Disable file completions
@@ -20,6 +20,7 @@ complete -c caelestia -n $not_seen -a 'clipboard' -d 'Open clipboard history'
complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities' complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities'
complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper' complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper'
complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer' complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer'
complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles'
# Shell # Shell
set -l commands mpris drawers wallpaper notifs set -l commands mpris drawers wallpaper notifs
@@ -126,3 +127,9 @@ complete -c caelestia -n "$seen emoji" -s 'f' -l 'fetch' -d 'Fetch emoji/glyph d
complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode' complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode'
complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode' complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode'
complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window' complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window'
# Install (component flags come from the manifest, so are not completed statically)
complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r
complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r
complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts'
+11 -7
View File
@@ -1,12 +1,16 @@
from caelestia.parser import parse_args from caelestia.parser import parse_args
from caelestia.utils.io import log
from caelestia.utils.version import print_version from caelestia.utils.version import print_version
def main() -> None: def main() -> None:
parser, args = parse_args() try:
if args.version: parser, args = parse_args()
print_version() if args.version:
elif "cls" in args: print_version()
args.cls(args).run() elif "cls" in args:
else: args.cls(args).run()
parser.print_help() else:
parser.print_help()
except KeyboardInterrupt:
log("Exiting...")
+68 -1
View File
@@ -1,6 +1,21 @@
import argparse import argparse
import sys
from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper from caelestia.subcommands import (
clipboard,
emoji,
install,
record,
resizer,
scheme,
screenshot,
shell,
toggle,
wallpaper,
)
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import warn
from caelestia.utils.paths import wallpapers_dir from caelestia.utils.paths import wallpapers_dir
from caelestia.utils.scheme import get_scheme_names, scheme_variants from caelestia.utils.scheme import get_scheme_names, scheme_variants
from caelestia.utils.wallpaper import get_wallpaper from caelestia.utils.wallpaper import get_wallpaper
@@ -128,4 +143,56 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
resizer_parser.add_argument("height", nargs="?", help="height to resize to") resizer_parser.add_argument("height", nargs="?", help="height to resize to")
resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)") resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)")
# Create parser for install opts
install_parser = command_parser.add_parser(
"install",
help="install the Caelestia dotfiles",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
install_parser.set_defaults(cls=install.Command)
install_parser.add_argument("--aur-helper", choices=["yay", "paru"], help="the AUR helper to use")
install_parser.add_argument(
"--enable-components", metavar="LIST", help="comma-separated list of components to enable"
)
install_parser.add_argument(
"--disable-components", metavar="LIST", help="comma-separated list of components to disable"
)
install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
_set_install_epilog(install_parser)
return parser, parser.parse_args() return parser, parser.parse_args()
def _set_install_epilog(install_parser: argparse.ArgumentParser) -> None:
"""Add components if using install subcommand"""
if len(sys.argv) > 1 and sys.argv[1] == "install":
manifest = _load_install_manifest()
if manifest is not None and manifest.components:
install_parser.epilog = _components_epilog(manifest)
def _load_install_manifest() -> Manifest | None:
source = DotsSource()
try:
source.ensure()
return source.manifest_at(source.remote_ref)
except Exception as e:
warn(f"failed to load manifest from dots repo ({e})\n", prefix=False)
return None
def _components_epilog(manifest: Manifest) -> str:
def e(*v: int) -> str:
return f"\033[{';'.join(str(c) for c in v)}m"
def b(c: int) -> str:
return e(1, c)
reset = e(0)
width = max(len(name) for name in manifest.components)
lines = [f"{b(34)}available components (for --enable-components / --disable-components):{reset}"]
for name, comp in manifest.components.items():
lines.append(f" {b(32)}{name:<{width}}{reset}\t{'(default)' if comp.default else '(off)'}")
return "\n".join(lines)
+244
View File
@@ -0,0 +1,244 @@
import os
import shutil
import subprocess
import textwrap
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError, expand, expand_dests
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import PROMPT_COLOUR, confirm, disable_input, fatal, format_msg, info, log, pause, prompt, warn
from caelestia.utils.paths import (
config_backup_dir,
config_dir,
dots_dir,
)
def _parse_list_arg(value: str | None) -> list[str] | None:
if value is None:
return None
return [item.strip() for item in value.split(",") if item.strip()]
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
self.print_greeting()
self.create_backup()
source, tip, manifest = self.fetch_manifest()
self.deploy_configs(source, manifest)
helper, packages, local_packages = self.install_packages(source, manifest)
self.run_hooks(manifest)
DotsState(
aur_helper=helper,
applied_rev=tip,
enabled_components=manifest.enabled_components,
packages=packages,
local_packages=local_packages,
).save()
self.print_done()
def print_greeting(self) -> None:
print(
"\033[38;2;150;241;241m" # Caelestia colour
+ textwrap.dedent(
r"""
╭─────────────────────────────────────────────────╮
│ ______ __ __ _ │
│ / ____/___ ____ / /__ _____/ /_(_)___ _ │
│ / / / __ `/ _ \/ / _ \/ ___/ __/ / __ `/ │
│ / /___/ /_/ / __/ / __(__ ) /_/ / /_/ / │
\____/\__,_/\___/_/\___/____/\__/_/\__,_/ │
│ │
╰─────────────────────────────────────────────────╯
"""
)
+ "\033[0m"
)
info("Welcome to the Caelestia dotfiles installer!")
info("Here's a quick overview on what this command is going to do:")
info(" - Install dependencies")
info(" - Install config files")
info("The installer does NOT set up hardware/system level configs (e.g. drivers). Please do this yourself.")
pause()
print()
def create_backup(self) -> None:
if config_dir.exists():
if not confirm("Back up the config directory?", default=True):
return
log(f"Creating a backup of {config_dir}...")
if config_backup_dir.exists():
if not confirm("A backup already exists, overwrite?", default=False):
info("Not creating backup.")
return
log("Deleting old backup...")
shutil.rmtree(config_backup_dir)
shutil.copytree(config_dir, config_backup_dir, symlinks=True)
info(f"Created backup at {config_backup_dir}")
def fetch_manifest(self) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.checkout_tip()
except SourceError as e:
fatal(e)
enable = _parse_list_arg(self.args.enable_components)
disable = _parse_list_arg(self.args.disable_components)
try:
manifest = source.manifest_at(tip)
manifest.resolve_components(enable=enable, disable=disable)
if enable is None and disable is None:
self.prompt_optional_components(manifest)
except (SourceError, ManifestError, ComponentError) as e:
fatal(e)
names = ", ".join(manifest.enabled_components) or "none"
info(f"Enabled components: {names}")
return source, tip, manifest
def prompt_optional_components(self, manifest: Manifest) -> None:
comp_arr = manifest.disabled_components
if not comp_arr:
return
print(format_msg(PROMPT_COLOUR, True, "Components to enable?"))
max_idx_w = len(str(len(comp_arr)))
for i, comp in enumerate(comp_arr):
print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{comp}"))
print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)"))
def _valid_v(v: str) -> int:
try:
i_v = int(v, base=10) - 1 # -1 to translate to 0 index
except ValueError:
raise ValueError(f'Given value "{v}" must be an integer.')
if i_v < 0 or i_v >= len(comp_arr):
raise ValueError(f'Given value "{v}" must be between 1 and {len(comp_arr)} inclusive.')
return i_v
def _parse(ans: str) -> list[str] | None:
if ans in ("a", "all"):
return list(manifest.components)
if not ans:
return None
enabled: list[str] = []
for tok in ans.split():
fr, sep, to = tok.partition("-")
if sep:
fr = _valid_v(fr)
to = _valid_v(to)
if fr > to:
raise ValueError(f'Given range "{tok}" must be lo-hi.')
enabled += comp_arr[fr : to + 1]
elif tok.startswith("^"):
t = _valid_v(tok[1:])
enabled += comp_arr[:t] + comp_arr[t + 1 :]
else:
t = _valid_v(tok)
enabled.append(comp_arr[t])
return list(set(enabled))
while True:
ans = prompt("", end="").lower().strip()
try:
enabled = _parse(ans)
except ValueError as e:
warn(f"invalid input. {e} Please try again.")
continue
if enabled is not None:
manifest.resolve_components(enable=enabled)
return
def deploy_configs(self, source: DotsSource, manifest: Manifest) -> None:
print()
log("Installing configs...")
deployer = Deployer()
for entry in manifest.enabled_entries():
src = source.working_path(expand(entry.src))
if not src.exists():
warn(f"missing in source, skipping: {entry.src}")
continue
dests = expand_dests(entry.dest)
if not dests:
warn(f"dest glob matched nothing, skipping: {entry.dest}")
continue
for dest in dests:
deployer.place(src, Path(dest))
info(f"{entry.src} -> {dest}")
def install_packages(self, source: DotsSource, manifest: Manifest) -> tuple[str, list[str], dict[str, list[str]]]:
installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm)
packages = manifest.enabled_packages()
if packages:
print()
log("Installing packages...")
installer.install(packages)
local_packages = {}
local_dirs = manifest.enabled_local_packages()
if local_dirs:
print()
log("Building local packages...")
for path in local_dirs:
directory = source.working_path(path)
if not directory.is_dir():
warn(f"missing in repo, skipping: {path}")
continue
log(f"Building {path}...")
local_packages[path] = installer.build_install(directory)
return getattr(installer, "helper", DEFAULT_AUR_HELPER), packages, local_packages
def run_hooks(self, manifest: Manifest) -> None:
hooks = manifest.enabled_hooks("post_install")
if not hooks:
return
print()
log("Running post-install hooks...")
env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)}
for hook in hooks:
info(f"Running hook: {hook}")
result = subprocess.run(hook, shell=True, env=env)
if result.returncode != 0:
warn(f"hook exited with {result.returncode}")
def print_done(self) -> None:
print()
info("All done! Caelestia has been installed.")
info("A few things to finish up:")
info(" - A reboot is recommended for all changes take effect")
info(" - Edit `~/.config/caelestia/hypr-vars.conf` to set default apps, keybinds and much more")
info(" - Edit `~/.config/caelestia/hypr-user.conf` to set your monitor layout and other Hyprland configs")
info(" - Run `caelestia update` later to pull in the latest changes")
info("Enjoy! For support (or to just hang out), join our Discord server: https://discord.gg/BGDCFCmMBk")
+2 -5
View File
@@ -1,4 +1,3 @@
import json
import re import re
import shutil import shutil
import subprocess import subprocess
@@ -9,7 +8,7 @@ from pathlib import Path
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.notify import close_notification, notify from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir, user_config_path from caelestia.utils.paths import get_config, recording_notif_path, recording_path, recordings_dir
RECORDER = "gpu-screen-recorder" RECORDER = "gpu-screen-recorder"
@@ -65,12 +64,10 @@ class Command:
if self.args.sound: if self.args.sound:
args += ["-a", "default_output"] args += ["-a", "default_output"]
config = get_config()
try: try:
config = json.loads(user_config_path.read_text())
if "record" in config and "extraArgs" in config["record"]: if "record" in config and "extraArgs" in config["record"]:
args += config["record"]["extraArgs"] args += config["record"]["extraArgs"]
except (json.JSONDecodeError, FileNotFoundError):
pass
except TypeError as e: except TypeError as e:
raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}") raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}")
+46 -50
View File
@@ -7,8 +7,8 @@ from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.logging import log_message from caelestia.utils.io import error, fatal, info, log, warn
from caelestia.utils.paths import user_config_path from caelestia.utils.paths import get_config
class WindowRule: class WindowRule:
@@ -52,8 +52,8 @@ class Command:
WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]), WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]),
] ]
config = get_config()
try: try:
config = json.loads(user_config_path.read_text())
if "resizer" in config and "rules" in config["resizer"]: if "resizer" in config and "rules" in config["resizer"]:
rules = [] rules = []
for rule_config in config["resizer"]["rules"]: for rule_config in config["resizer"]["rules"]:
@@ -67,8 +67,8 @@ class Command:
) )
) )
return rules return rules
except (json.JSONDecodeError, KeyError): except KeyError:
log_message("ERROR: invalid config") warn("invalid config, falling back to default rules")
except FileNotFoundError: except FileNotFoundError:
pass pass
@@ -188,12 +188,10 @@ class Command:
command2 = self._make_move_cmd(int(move_x), int(move_y), address) command2 = self._make_move_cmd(int(move_x), int(move_y), address)
hypr.batch(command1, command2) hypr.batch(command1, command2)
log_message( info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})")
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
)
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") error(f"failed to apply PiP action to window 0x{window_id}: {e}")
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
dispatch_commands = [] dispatch_commands = []
@@ -214,10 +212,10 @@ class Command:
try: try:
hypr.batch(*dispatch_commands) hypr.batch(*dispatch_commands)
log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
return True return True
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") error(f"failed to apply window actions for window 0x{window_id}: {e}")
return False return False
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
@@ -236,7 +234,7 @@ class Command:
if re.search(rule.name, window_title): if re.search(rule.name, window_title):
return rule return rule
except re.error: except re.error:
log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") warn(f"invalid regex pattern in rule '{rule.name}'")
return None return None
@@ -258,7 +256,7 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
window_info = self._get_window_info(window_id) window_info = self._get_window_info(window_id)
@@ -268,19 +266,19 @@ class Command:
window_title = window_info.get("title", "") window_title = window_info.get("title", "")
initial_title = window_info.get("initialTitle", "") initial_title = window_info.get("initialTitle", "")
log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
rule = self._match_window_rule(window_title, initial_title) rule = self._match_window_rule(window_title, initial_title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") info(f"Matched rule '{rule.name}' for window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window title event: {e}") warn(f"failed to parse window title event: {e}")
def _handle_open_event(self, event: str) -> None: def _handle_open_event(self, event: str) -> None:
try: try:
@@ -296,22 +294,22 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
rule = self._match_window_rule(title, title) rule = self._match_window_rule(title, title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") info(f"Matched rule '{rule.name}' for new window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window open event: {e}") warn(f"failed to parse window open event: {e}")
def run(self) -> None: def run(self) -> None:
if self.args.daemon: if self.args.daemon:
@@ -324,7 +322,7 @@ class Command:
): ):
self._run_active_mode() self._run_active_mode()
else: else:
print( info(
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
) )
@@ -333,28 +331,27 @@ class Command:
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
if not active_window_result.get("floating", False): if not active_window_result.get("floating", False):
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") warn(f"window '{window_title}' is not floating; PiP only works on floating windows.")
print("Try making it floating first with: hyprctl dispatch togglefloating")
return return
print(f"Applying PIP to active window: '{window_title}'") info(f"Applying PiP to active window: '{window_title}'")
self._apply_pip_action(window_id) self._apply_pip_action(window_id)
print("PIP applied successfully") info("PiP applied successfully")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply PIP to active window: {e}") error(f"failed to apply PiP to active window: {e}")
def _run_active_mode(self) -> None: def _run_active_mode(self) -> None:
try: try:
@@ -371,10 +368,10 @@ class Command:
matching_windows = self._find_matching_windows(temp_rule) matching_windows = self._find_matching_windows(temp_rule)
if not matching_windows: if not matching_windows:
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
return return
print(f"Found {len(matching_windows)} matching window(s)") info(f"Found {len(matching_windows)} matching window(s)")
# Apply rule to all matching windows # Apply rule to all matching windows
success_count = 0 success_count = 0
@@ -382,41 +379,41 @@ class Command:
window_id = window["address"][2:] # Remove "0x" prefix window_id = window["address"][2:] # Remove "0x" prefix
window_title = window.get("title", "") window_title = window.get("title", "")
print(f"Applying rule to window 0x{window_id}: '{window_title}'") info(f"Applying rule to window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
success_count += 1 success_count += 1
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule: {e}") error(f"failed to apply rule: {e}")
def _apply_to_active_window(self, temp_rule: WindowRule) -> None: def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
"""Apply rule only to the currently active window""" """Apply rule only to the currently active window"""
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
print(f"Applying rule to active window 0x{window_id}: '{window_title}'") info(f"Applying rule to active window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
print("Rule applied successfully") info("Rule applied successfully")
else: else:
print("Failed to apply rule") error("failed to apply rule")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule to active window: {e}") error(f"failed to apply rule to active window: {e}")
def _find_matching_windows(self, temp_rule: WindowRule) -> list: def _find_matching_windows(self, temp_rule: WindowRule) -> list:
"""Find all windows that match the given rule pattern""" """Find all windows that match the given rule pattern"""
@@ -445,7 +442,7 @@ class Command:
try: try:
matches = bool(re.search(temp_rule.name, window_title)) matches = bool(re.search(temp_rule.name, window_title))
except re.error: except re.error:
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") warn(f"invalid regex pattern '{temp_rule.name}'")
return [] return []
if matches: if matches:
@@ -454,23 +451,22 @@ class Command:
return matching_windows return matching_windows
except Exception as e: except Exception as e:
print(f"ERROR: Failed to find matching windows: {e}") error(f"failed to find matching windows: {e}")
return [] return []
def _run_daemon(self) -> None: def _run_daemon(self) -> None:
log_message("Hyprland window resizer started") info("Hyprland window resizer started")
log_message(f"Loaded {len(self.window_rules)} window rules") info(f"Loaded {len(self.window_rules)} window rules")
socket_path = Path(hypr.socket2_path) socket_path = Path(hypr.socket2_path)
if not socket_path.exists(): if not socket_path.exists():
log_message(f"ERROR: Hyprland socket not found at {socket_path}") fatal(f"Hyprland socket not found at {socket_path}")
return
try: try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path) sock.connect(hypr.socket2_path)
log_message("Connected to Hyprland socket, listening for events...") info("Connected to Hyprland socket, listening for events...")
while True: while True:
data = sock.recv(4096).decode() data = sock.recv(4096).decode()
@@ -480,6 +476,6 @@ class Command:
self._handle_window_event(line) self._handle_window_event(line)
except KeyboardInterrupt: except KeyboardInterrupt:
log_message("Resizer daemon stopped") info("Resizer daemon stopped")
except Exception as e: except Exception as e:
log_message(f"ERROR: {e}") error(str(e))
+3 -3
View File
@@ -6,7 +6,7 @@ from collections import ChainMap
from typing import Any, Callable, cast from typing import Any, Callable, cast
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.paths import user_config_path from caelestia.utils.paths import get_config
def is_subset(superset, subset): def is_subset(superset, subset):
@@ -103,8 +103,8 @@ class Command:
}, },
} }
try: try:
self.cfg = DeepChainMap(json.loads(user_config_path.read_text())["toggles"], self.cfg) self.cfg = DeepChainMap(get_config()["toggles"], self.cfg)
except (FileNotFoundError, json.JSONDecodeError, KeyError): except KeyError:
pass pass
def run(self) -> None: def run(self) -> None:
+58
View File
@@ -0,0 +1,58 @@
import shutil
import tempfile
from pathlib import Path
class Deployer:
"""Places files from the dots clone into their destinations."""
def place(self, src: Path, dest: Path) -> None:
"""Place a whole entry (file or directory tree), replacing any existing dest."""
if src.is_dir():
self.place_dir(src, dest)
else:
self.place_file(src, dest)
def place_dir(self, src: Path, dest: Path) -> None:
"""Place a directory tree recursively, overwriting any existing dest files."""
if dest.is_symlink() or dest.is_file():
self.remove(dest)
dest.mkdir(parents=True, exist_ok=True)
for path in src.rglob("*"):
if path.is_file():
self.place_file(path, dest / path.relative_to(src))
elif path.is_dir():
(dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True)
def place_file(self, src: Path, dest: Path) -> None:
"""Atomically place a single file, replacing any existing dest."""
if dest.is_dir() and not dest.is_symlink():
self.remove(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
f = tempfile.NamedTemporaryFile(dir=dest.parent, delete=False)
f.close()
try:
shutil.copyfile(src, f.name)
shutil.copymode(src, f.name)
Path(f.name).replace(dest)
except BaseException:
Path(f.name).unlink()
raise
def write_new(self, src: Path, dest: Path) -> Path:
"""Write the upstream version alongside dest as <dest>.new and return that path."""
new_path = dest.parent / f"{dest.name}.new"
self.place_file(src, new_path)
return new_path
def remove(self, path: Path) -> None:
if path.is_symlink() or path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
+217
View File
@@ -0,0 +1,217 @@
import glob
import os
import re
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from string import Template
from typing import Any
_XDG_DEFAULTS = {
"XDG_CONFIG_HOME": str(Path.home() / ".config"),
"XDG_DATA_HOME": str(Path.home() / ".local/share"),
"XDG_STATE_HOME": str(Path.home() / ".local/state"),
"XDG_CACHE_HOME": str(Path.home() / ".cache"),
}
_GLOB_MAGIC = re.compile(r"[*?[]")
_LOCAL_PREFIX = "local:"
class ManifestError(Exception):
"""Raised when manifest.toml is malformed."""
class ComponentError(Exception):
"""Raised when component flags are invalid or contradictory."""
def expand(text: str) -> Path:
"""Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path."""
env = {**_XDG_DEFAULTS, **os.environ}
return Path(Template(text).safe_substitute(env)).expanduser()
def expand_dests(dest: str) -> list[Path]:
"""Expand globs within a dest path.
Globs from the start until the segment with the last glob so subdirs are
created if they didn't exist previously.
"""
expanded = expand(dest)
if not _GLOB_MAGIC.search(str(expanded)):
return [expanded]
parts = expanded.parts
glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part))
pattern = str(Path(*parts[: glob_idx + 1]))
tail = parts[glob_idx + 1 :]
return [Path(match, *tail) for match in sorted(glob.glob(pattern))]
@dataclass(frozen=True)
class ManifestEntry:
src: str
dest: str
@dataclass(frozen=True)
class ManifestComponent:
name: str
default: bool = False
packages: list[str] = field(default_factory=list)
entries: list[ManifestEntry] = field(default_factory=list)
post_install: list[str] = field(default_factory=list)
post_update: list[str] = field(default_factory=list)
@dataclass
class _ManifestData:
enabled_comps: list[str] = field(default_factory=list)
disabled_comps: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class Manifest:
components: dict[str, ManifestComponent] = field(default_factory=dict)
packages: list[str] = field(default_factory=list)
post_install: list[str] = field(default_factory=list)
post_update: list[str] = field(default_factory=list)
_data: _ManifestData = field(default_factory=_ManifestData, init=False, repr=False)
@property
def enabled_components(self) -> list[str]:
return self._data.enabled_comps
@property
def disabled_components(self) -> list[str]:
return self._data.disabled_comps
@staticmethod
def parse(text: str) -> "Manifest":
try:
raw = tomllib.loads(text)
except tomllib.TOMLDecodeError as e:
raise ManifestError(f"invalid TOML: {e}") from e
post_install = _validate_str_list(raw.get("post_install", []), "post_install")
post_update = _validate_str_list(raw.get("post_update", []), "post_update")
packages = _validate_str_list(raw.get("packages", []), "packages")
components = {}
for comp in raw.get("components", []):
parsed = _parse_component(comp)
components[parsed.name] = parsed
return Manifest(
components=components,
packages=packages,
post_install=post_install,
post_update=post_update,
)
def resolve_components(
self,
enable: list[str] | None = None,
disable: list[str] | None = None,
) -> None:
"""Resolves enabled/disabled components. This MUST be called before calling any other method."""
enable_set = set(enable or [])
disable_set = set(disable or [])
known = set(self.components)
for name in enable_set | disable_set:
if name not in known:
raise ComponentError(f"unknown component: {name}")
conflict = enable_set & disable_set
if conflict:
raise ComponentError(f"component(s) both enabled and disabled: {', '.join(sorted(conflict))}")
enabled = {name for name, comp in self.components.items() if comp.default}
enabled |= enable_set
enabled -= disable_set
self._data.enabled_comps.clear()
self._data.disabled_comps.clear()
for name in self.components:
if name in enabled:
self._data.enabled_comps.append(name)
else:
self._data.disabled_comps.append(name)
def enabled_entries(self) -> list[ManifestEntry]:
"""The entries of every enabled component."""
entries: list[ManifestEntry] = []
for name in self._data.enabled_comps:
entries.extend(self.components[name].entries)
return entries
def enabled_hooks(self, kind: str) -> list[str]:
"""Global + enabled components' hooks of the given kind."""
hooks = list(getattr(self, kind))
for name in self._data.enabled_comps:
hooks.extend(getattr(self.components[name], kind))
return hooks
def enabled_packages(self) -> list[str]:
"""Repo/AUR packages to install."""
return [p for p in self._all_packages() if not p.startswith(_LOCAL_PREFIX)]
def enabled_local_packages(self) -> list[str]:
"""Local PKGBUILD dirs to build.
Local packages are determined by a local: prefix and are
relative dirs instead of package names.
"""
return [p[len(_LOCAL_PREFIX) :] for p in self._all_packages() if p.startswith(_LOCAL_PREFIX)]
def _all_packages(self) -> list[str]:
"""The manifest's top-level packages plus enabled components', in manifest order.
Top-level packages come first, then each enabled component's packages in
component order. Only the first occurrence of each package is kept.
"""
seen: set[str] = set()
ordered: list[str] = []
for pkg in (*self.packages, *(p for c in self._data.enabled_comps for p in self.components[c].packages)):
if pkg not in seen:
seen.add(pkg)
ordered.append(pkg)
return ordered
def _require_key(d: dict[str, Any], key: str, ctx: str) -> Any:
if key not in d:
raise ManifestError(f"{ctx}: missing required key '{key}'")
return d[key]
def _validate_str_list(value: Any, ctx: str) -> list[str]:
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
raise ManifestError(f"{ctx}: expected a list of strings")
return value
def _parse_entry(d: Any) -> ManifestEntry:
if not isinstance(d, dict):
raise ManifestError("entry: expected a table")
return ManifestEntry(src=_require_key(d, "src", "entry"), dest=_require_key(d, "dest", "entry"))
def _parse_component(d: dict[str, Any]) -> ManifestComponent:
name = _require_key(d, "name", "component")
return ManifestComponent(
name=name,
default=bool(d.get("default", False)),
packages=_validate_str_list(d.get("packages", []), f"component '{name}' packages"),
entries=[_parse_entry(e) for e in d.get("entries", [])],
post_install=_validate_str_list(d.get("post_install", []), f"component '{name}' post_install"),
post_update=_validate_str_list(d.get("post_update", []), f"component '{name}' post_update"),
)
+130
View File
@@ -0,0 +1,130 @@
import os
import shutil
import subprocess
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from caelestia.utils.io import fatal, info
DEFAULT_AUR_HELPER = "paru"
AUR_HELPERS = DEFAULT_AUR_HELPER, "yay"
def _install_aur_helper(helper: str, noconfirm: bool = False) -> None:
pacman_cmd = ["sudo", "pacman", "-S", "--needed", "git", "base-devel"]
if noconfirm:
pacman_cmd.append("--noconfirm")
subprocess.run(pacman_cmd, check=True)
repo_url = f"https://aur.archlinux.org/{helper}.git"
with tempfile.TemporaryDirectory() as repo_dir:
subprocess.run(["git", "clone", repo_url, repo_dir], check=True)
makepkg_cmd = ["makepkg", "-si"]
if noconfirm:
makepkg_cmd.append("--noconfirm")
subprocess.run(makepkg_cmd, cwd=repo_dir, check=True)
if helper == "yay":
subprocess.run(["yay", "-Y", "--gendb"], check=True)
subprocess.run(["yay", "-Y", "--devel", "--save"], check=True)
elif helper == "paru":
subprocess.run(["paru", "--gendb"], check=True)
class PackageInstaller(ABC):
@staticmethod
def get(helper: str | None = None, noconfirm: bool = False) -> "PackageInstaller":
"""Pick a package installer: the requested/detected AUR helper on Arch, else a no-op."""
# Not on Arch, can't install packages
if shutil.which("pacman") is None:
return NoopInstaller()
# Explicitly given
if helper:
if not shutil.which(helper):
if helper not in AUR_HELPERS:
fatal(f"given AUR helper {helper} is not installed and is unable to be installed automatically.")
info(f"Given AUR helper not installed. Installing {helper}...")
_install_aur_helper(helper, noconfirm)
return ArchInstaller(helper, noconfirm)
# Not given, find installed one
for candidate in AUR_HELPERS:
if shutil.which(candidate):
return ArchInstaller(candidate, noconfirm)
info(f"No AUR helper found. Installing {DEFAULT_AUR_HELPER}...")
_install_aur_helper(DEFAULT_AUR_HELPER, noconfirm)
return ArchInstaller(DEFAULT_AUR_HELPER, noconfirm)
# --- Abstract methods ---
@abstractmethod
def install(self, packages: list[str]) -> None: ...
@abstractmethod
def remove(self, packages: list[str]) -> None: ...
@abstractmethod
def build_install(self, directory: Path) -> list[str]:
"""Build and install the PKGBUILD in `directory`, returning the installed package names."""
class NoopInstaller(PackageInstaller):
"""Used off Arch, where the dots' packages are not available via pacman/AUR."""
def install(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package install (not on Arch): {', '.join(packages)}")
def remove(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package removal (not on Arch): {', '.join(packages)}")
def build_install(self, directory: Path) -> list[str]:
info(f"Skipping local package build (not on Arch): {directory}")
return []
class ArchInstaller(PackageInstaller):
def __init__(self, helper: str, noconfirm: bool = False) -> None:
self.helper = helper
self.flags = ["--noconfirm"] if noconfirm else []
def install(self, packages: list[str], extra_flags: list[str] | None = None) -> None:
if not packages:
return
subprocess.run([self.helper, "-S", "--needed", *self.flags, *(extra_flags or []), *packages], check=True)
def remove(self, packages: list[str]) -> None:
if not packages:
return
subprocess.run([self.helper, "-Rns", *self.flags, *packages], check=True)
def build_install(self, directory: Path) -> list[str]:
srcinfo = subprocess.check_output(["makepkg", "--printsrcinfo"], cwd=directory, text=True)
names = []
depends = []
for line in srcinfo.splitlines():
key, sep, value = line.partition("=")
if not sep:
continue
key = key.strip()
if key == "pkgname":
names.append(value.strip())
elif key == "depends":
depends.append(value.strip())
self.install(depends, extra_flags=["--asdeps"])
# Stop makepkg from resetting sudo
env = {**os.environ, "PACMAN_AUTH": "sudo"}
# -f = force, -s = sync deps, -i = install
subprocess.run(["makepkg", "-fsi", *self.flags], cwd=directory, env=env, check=True)
return names
+107
View File
@@ -0,0 +1,107 @@
import shutil
import subprocess
from pathlib import Path
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.paths import dots_dir, get_config
class SourceError(Exception):
"""Raised when a git operation against the dots clone fails."""
class DotsSource:
_fetched_source: bool = False
def __init__(self) -> None:
cfg = get_config().get("dots", {})
self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git")
self.branch = cfg.get("branch", "main")
@property
def remote_ref(self) -> str:
return f"origin/{self.branch}"
def exists(self) -> bool:
return (dots_dir / ".git").is_dir()
def working_path(self, relpath: str | Path) -> Path:
"""Get a Path relative to the dots dir."""
return dots_dir / relpath
def ensure(self) -> None:
"""Clone the repo if absent, otherwise fetch the latest refs.
If the configured url changed, the stale clone is removed and re-cloned
from the new source.
"""
if self.exists():
if self.current_url() == self.url:
if DotsSource._fetched_source:
return
self._git("fetch", "--prune", "origin", self.branch)
DotsSource._fetched_source = True
return
shutil.rmtree(dots_dir)
dots_dir.parent.mkdir(parents=True, exist_ok=True)
self._run("git", "clone", "--branch", self.branch, self.url, str(dots_dir))
DotsSource._fetched_source = True
def current_url(self) -> str:
return self._git("remote", "get-url", "origin").strip()
def checkout_tip(self) -> str:
"""Reset the working tree to the fetched tip and return its commit hash."""
self._git("reset", "--hard", self.remote_ref)
return self.tip_rev()
def tip_rev(self) -> str:
return self._git("rev-parse", self.remote_ref).strip()
def changed_files(self, base: str, head: str) -> list[str]:
"""Repo-relative paths that differ between two revisions."""
out = self._git("diff", "--name-only", base, head)
return [line for line in out.splitlines() if line]
def clean(self) -> None:
"""Remove all untracked files in the git repo."""
self._git("clean", "-fdx")
# --- Accessors ---
def manifest_at(self, ref: str) -> Manifest:
return Manifest.parse(self.text_at(ref, "manifest.toml"))
def text_at(self, ref: str, relpath: str) -> str:
return self._git("show", f"{ref}:{relpath}")
def blob_at(self, ref: str, relpath: str) -> bytes:
return self._git_bytes("show", f"{ref}:{relpath}")
def files_at(self, ref: str, relpath: str) -> list[str]:
"""Repo-relative paths of all files under relpath at ref (the path itself if it is a file)."""
out = self._git("ls-tree", "-r", "--name-only", ref, "--", relpath)
return [line for line in out.splitlines() if line]
# --- Helpers ---
def _git(self, *args: str) -> str:
return self._run("git", "-C", str(dots_dir), *args)
def _git_bytes(self, *args: str) -> bytes:
result = subprocess.run(["git", "-C", str(dots_dir), *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed")
return result.stdout
def _run(self, *cmd: str) -> str:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise SourceError(result.stderr.strip() or f"{' '.join(cmd)} failed")
return result.stdout
+52
View File
@@ -0,0 +1,52 @@
import json
from dataclasses import dataclass, field
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER
from caelestia.utils.io import warn
from caelestia.utils.paths import atomic_dump, dots_state_path
@dataclass
class DotsState:
# The AUR helper selected selected at install time
aur_helper: str = "paru"
# The git rev of currently applied dots version
applied_rev: str | None = None
# The currently enabled components
enabled_components: list[str] = field(default_factory=list)
# Previously installed packages/local packages
packages: list[str] = field(default_factory=list)
local_packages: dict[str, list[str]] = field(default_factory=dict)
@staticmethod
def load() -> "DotsState":
try:
data = json.loads(dots_state_path.read_text())
except FileNotFoundError:
return DotsState()
except json.JSONDecodeError:
warn("failed to parse current dots state.")
return DotsState()
return DotsState(
aur_helper=data.get("aur_helper", DEFAULT_AUR_HELPER),
applied_rev=data.get("applied_rev"),
enabled_components=data.get("enabled_components", []),
packages=data.get("packages", []),
local_packages=data.get("local_packages", {}),
)
def save(self) -> None:
atomic_dump(
dots_state_path,
{
"aur_helper": self.aur_helper,
"applied_rev": self.applied_rev,
"enabled_components": self.enabled_components,
"packages": self.packages,
"local_packages": self.local_packages,
},
)
+88
View File
@@ -0,0 +1,88 @@
import sys
from typing import Never
LOG_COLOUR: int = 2
INFO_COLOUR: int = 0
PROMPT_COLOUR: int = 36
WARNING_COLOUR: int = 33
ERROR_COLOUR: int = 31
_disable_input: bool = False
def disable_input() -> None:
global _disable_input
_disable_input = True
def log_exception(func):
"""Log exceptions to stderr instead of raising.
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
error(f'exception during "{func.__name__}()": {str(e)}')
return wrapper
def format_msg(colour: int, prefix: bool, msg: str) -> str:
return f"\033[{colour}m{':: ' if prefix else ''}{msg}\033[0m"
def log(msg: str, prefix: bool = True) -> None:
print(format_msg(LOG_COLOUR, prefix, msg))
def info(msg: str, prefix: bool = True) -> None:
print(format_msg(INFO_COLOUR, prefix, msg))
def warn(msg: str, prefix: bool = True) -> None:
print(format_msg(WARNING_COLOUR, prefix, f"Warning: {msg}"))
def error(err: str | Exception, prefix: bool = True) -> None:
print(format_msg(ERROR_COLOUR, prefix, f"Error: {err}"), file=sys.stderr)
def fatal(err: str | Exception, prefix: bool = True) -> Never:
print(format_msg(ERROR_COLOUR, prefix, f"Fatal: {err}"), file=sys.stderr)
sys.exit(1)
def _input(prompt: str) -> str:
if _disable_input:
print(prompt, end="")
return ""
try:
return input(prompt)
except (KeyboardInterrupt, EOFError):
print()
raise KeyboardInterrupt()
def prompt(msg: str, prefix: bool = True, end: str = " ") -> str:
return _input(format_msg(PROMPT_COLOUR, prefix, msg) + end)
def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool:
suffix = " [Y/n]" if default else " [y/N]"
answer = prompt(msg + suffix, prefix=prefix).strip().lower()
if not answer:
return default
return answer in ("y", "yes")
def pause() -> None:
if _disable_input:
return
_input("\n\033[2m\033[3m(Ctrl+C to exit, enter to continue)\033[0m")
print("\033[1A\r\033[2K\033[1A\r\033[2K", end="") # Clear pause prompt
-22
View File
@@ -1,22 +0,0 @@
from time import strftime
def log_message(message: str) -> None:
timestamp = strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def log_exception(func):
"""Log exceptions to stdout instead of raising
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
log_message(f'Error during execution of "{func.__name__}()": {str(e)}')
return wrapper
+31 -5
View File
@@ -1,11 +1,12 @@
import hashlib import hashlib
import json import json
import os import os
import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from caelestia.utils.io import warn
config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config"))
data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share")) data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share"))
state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state")) state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state"))
@@ -24,6 +25,10 @@ templates_dir: Path = cli_data_dir / "templates"
user_templates_dir: Path = c_config_dir / "templates" user_templates_dir: Path = c_config_dir / "templates"
theme_dir: Path = c_state_dir / "theme" theme_dir: Path = c_state_dir / "theme"
config_backup_dir: Path = config_dir.parent / f"{config_dir.name}.bak"
dots_dir: Path = c_state_dir / "dots"
dots_state_path: Path = c_state_dir / "dots-state.json"
scheme_path: Path = c_state_dir / "scheme.json" scheme_path: Path = c_state_dir / "scheme.json"
scheme_data_dir: Path = cli_data_dir / "schemes" scheme_data_dir: Path = cli_data_dir / "schemes"
scheme_cache_dir: Path = c_cache_dir / "schemes" scheme_cache_dir: Path = c_cache_dir / "schemes"
@@ -52,8 +57,29 @@ def compute_hash(path: Path | str) -> str:
return sha.hexdigest() return sha.hexdigest()
def atomic_write(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
f = tempfile.NamedTemporaryFile("w", dir=path.parent, delete=False)
try:
with f:
f.write(content)
f.flush()
os.fsync(f.fileno())
os.replace(f.name, path)
except BaseException:
os.unlink(f.name)
raise
def atomic_dump(path: Path, content: dict[str, Any]) -> None: def atomic_dump(path: Path, content: dict[str, Any]) -> None:
with tempfile.NamedTemporaryFile("w") as f: atomic_write(path, json.dumps(content))
json.dump(content, f)
f.flush()
shutil.move(f.name, path) def get_config() -> dict[str, Any]:
try:
return json.loads(user_config_path.read_text())
except json.JSONDecodeError:
warn("failed to parse config, invalid JSON")
except FileNotFoundError:
pass
return {}
+24 -35
View File
@@ -8,18 +8,19 @@ import tempfile
from pathlib import Path from pathlib import Path
from caelestia.utils.colour import get_dynamic_colours from caelestia.utils.colour import get_dynamic_colours
from caelestia.utils.logging import log_exception from caelestia.utils.hypr import is_lua_config
from caelestia.utils.io import log_exception
from caelestia.utils.paths import ( from caelestia.utils.paths import (
atomic_write,
c_state_dir, c_state_dir,
config_dir, config_dir,
data_dir, data_dir,
get_config,
templates_dir, templates_dir,
theme_dir, theme_dir,
user_config_path,
user_templates_dir, user_templates_dir,
) )
from caelestia.utils.scheme import get_scheme from caelestia.utils.scheme import get_scheme
from caelestia.utils.hypr import is_lua_config
def gen_conf(colours: dict[str, str]) -> str: def gen_conf(colours: dict[str, str]) -> str:
@@ -28,6 +29,7 @@ def gen_conf(colours: dict[str, str]) -> str:
conf += f"${name} = {colour}\n" conf += f"${name} = {colour}\n"
return conf return conf
def gen_lua(colours: dict[str, str]) -> str: def gen_lua(colours: dict[str, str]) -> str:
lua = "return {\n" lua = "return {\n"
for name, colour in colours.items(): for name, colour in colours.items():
@@ -35,6 +37,7 @@ def gen_lua(colours: dict[str, str]) -> str:
lua += "}" lua += "}"
return lua return lua
def gen_scss(colours: dict[str, str]) -> str: def gen_scss(colours: dict[str, str]) -> str:
scss = "" scss = ""
for name, colour in colours.items(): for name, colour in colours.items():
@@ -117,15 +120,6 @@ def gen_sequences(colours: dict[str, str]) -> str:
) )
def write_file(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w") as f:
f.write(content)
f.flush()
shutil.move(f.name, path)
@log_exception @log_exception
def apply_terms(sequences: str) -> None: def apply_terms(sequences: str) -> None:
state = c_state_dir / "sequences.txt" state = c_state_dir / "sequences.txt"
@@ -152,57 +146,55 @@ def apply_terms(sequences: str) -> None:
@log_exception @log_exception
def apply_hypr(conf: str) -> None: def apply_hypr(conf: str) -> None:
ext = "lua" if is_lua_config() else "conf" ext = "lua" if is_lua_config() else "conf"
write_file(config_dir / f"hypr/scheme/current.{ext}", conf) atomic_write(config_dir / f"hypr/scheme/current.{ext}", conf)
@log_exception @log_exception
def apply_discord(scss: str) -> None: def apply_discord(scss: str) -> None:
import tempfile
with tempfile.TemporaryDirectory("w") as tmp_dir: with tempfile.TemporaryDirectory("w") as tmp_dir:
(Path(tmp_dir) / "_colours.scss").write_text(scss) (Path(tmp_dir) / "_colours.scss").write_text(scss)
conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True) conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True)
for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord": for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord":
write_file(config_dir / client / "themes/caelestia.theme.css", conf) atomic_write(config_dir / client / "themes/caelestia.theme.css", conf)
@log_exception @log_exception
def apply_pandora(colours: dict[str, str], mode: str) -> None: def apply_pandora(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "pandora.json", hash=True) template = gen_replace(colours, templates_dir / "pandora.json", hash=True)
template = template.replace("{{ $mode }}", mode) template = template.replace("{{ $mode }}", mode)
write_file(data_dir / "PandoraLauncher/themes/caelestia.json", template) atomic_write(data_dir / "PandoraLauncher/themes/caelestia.json", template)
@log_exception @log_exception
def apply_spicetify(colours: dict[str, str], mode: str) -> None: def apply_spicetify(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini") template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini")
write_file(config_dir / "spicetify/Themes/caelestia/color.ini", template) atomic_write(config_dir / "spicetify/Themes/caelestia/color.ini", template)
@log_exception @log_exception
def apply_fuzzel(colours: dict[str, str]) -> None: def apply_fuzzel(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "fuzzel.ini") template = gen_replace(colours, templates_dir / "fuzzel.ini")
write_file(config_dir / "fuzzel/fuzzel.ini", template) atomic_write(config_dir / "fuzzel/fuzzel.ini", template)
@log_exception @log_exception
def apply_btop(colours: dict[str, str]) -> None: def apply_btop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "btop.theme", hash=True) template = gen_replace(colours, templates_dir / "btop.theme", hash=True)
write_file(config_dir / "btop/themes/caelestia.theme", template) atomic_write(config_dir / "btop/themes/caelestia.theme", template)
subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL)
@log_exception @log_exception
def apply_nvtop(colours: dict[str, str]) -> None: def apply_nvtop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True) template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True)
write_file(config_dir / "nvtop/nvtop.colors", template) atomic_write(config_dir / "nvtop/nvtop.colors", template)
@log_exception @log_exception
def apply_htop(colours: dict[str, str]) -> None: def apply_htop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "htop.theme", hash=True) template = gen_replace(colours, templates_dir / "htop.theme", hash=True)
write_file(config_dir / "htop/htoprc", template) atomic_write(config_dir / "htop/htoprc", template)
subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL)
@@ -318,8 +310,8 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None)
for gtk_version in ["gtk-3.0", "gtk-4.0"]: for gtk_version in ["gtk-3.0", "gtk-4.0"]:
gtk_config_dir = config_dir / gtk_version gtk_config_dir = config_dir / gtk_version
write_file(gtk_config_dir / "gtk.css", gtk_template) atomic_write(gtk_config_dir / "gtk.css", gtk_template)
write_file(gtk_config_dir / "thunar.css", thunar_template) atomic_write(gtk_config_dir / "thunar.css", thunar_template)
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"]) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"])
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"]) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"])
@@ -332,13 +324,13 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None)
@log_exception @log_exception
def apply_qt(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None: def apply_qt(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None:
colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True) colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True)
write_file(config_dir / "qtengine/caelestia.colors", colours) atomic_write(config_dir / "qtengine/caelestia.colors", colours)
config = (templates_dir / "qtengine.json").read_text() config = (templates_dir / "qtengine.json").read_text()
config = config.replace("{{ $mode }}", mode.capitalize()) config = config.replace("{{ $mode }}", mode.capitalize())
if icon_theme is not None: if icon_theme is not None:
config = config.replace(f'"iconTheme": "Papirus-{mode.capitalize()}"', f'"iconTheme": "{icon_theme}"') config = config.replace(f'"iconTheme": "Papirus-{mode.capitalize()}"', f'"iconTheme": "{icon_theme}"')
write_file(config_dir / "qtengine/config.json", config) atomic_write(config_dir / "qtengine/config.json", config)
@log_exception @log_exception
@@ -347,7 +339,7 @@ def apply_warp(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "warp.yaml", hash=True) template = gen_replace(colours, templates_dir / "warp.yaml", hash=True)
template = template.replace("{{ $warp_mode }}", warp_mode) template = template.replace("{{ $warp_mode }}", warp_mode)
write_file(data_dir / "warp-terminal/themes/caelestia.yaml", template) atomic_write(data_dir / "warp-terminal/themes/caelestia.yaml", template)
@log_exception @log_exception
@@ -369,7 +361,7 @@ def apply_chromium(colours: dict[str, str]) -> None:
print(f"Unable to create {policy_dir} directory") print(f"Unable to create {policy_dir} directory")
continue continue
# Use tee instead of write_file cause we need sudo # Use tee instead of atomic_write cause we need sudo
subprocess.run( subprocess.run(
["sudo", "-n", "tee", str(policy_dir / "caelestia.json")], ["sudo", "-n", "tee", str(policy_dir / "caelestia.json")],
input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}), input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}),
@@ -392,13 +384,13 @@ def apply_zed(colours: dict[str, str], mode: str) -> None:
theme_path.unlink() theme_path.unlink()
content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode) content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode)
write_file(theme_path, content) atomic_write(theme_path, content)
@log_exception @log_exception
def apply_cava(colours: dict[str, str]) -> None: def apply_cava(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "cava.conf", hash=True) template = gen_replace(colours, templates_dir / "cava.conf", hash=True)
write_file(config_dir / "cava/config", template) atomic_write(config_dir / "cava/config", template)
subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL)
@@ -410,7 +402,7 @@ def apply_user_templates(colours: dict[str, str], mode: str) -> None:
for file in user_templates_dir.iterdir(): for file in user_templates_dir.iterdir():
if file.is_file(): if file.is_file():
content = gen_replace_dynamic(colours, file, mode) content = gen_replace_dynamic(colours, file, mode)
write_file(theme_dir / file.name, content) atomic_write(theme_dir / file.name, content)
def apply_colours(colours: dict[str, str], mode: str) -> None: def apply_colours(colours: dict[str, str], mode: str) -> None:
@@ -425,10 +417,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
except BlockingIOError: except BlockingIOError:
return return
try: cfg = get_config().get("theme", {})
cfg = json.loads(user_config_path.read_text())["theme"]
except (FileNotFoundError, json.JSONDecodeError, KeyError):
cfg = {}
def check(key: str) -> bool: def check(key: str) -> bool:
return cfg[key] if key in cfg else True return cfg[key] if key in cfg else True
+19 -23
View File
@@ -2,7 +2,6 @@ import json
import os import os
import random import random
import subprocess import subprocess
from argparse import Namespace from argparse import Namespace
from pathlib import Path from pathlib import Path
from typing import cast from typing import cast
@@ -11,12 +10,12 @@ from materialyoucolor.hct import Hct
from materialyoucolor.utils.color_utils import argb_from_rgb from materialyoucolor.utils.color_utils import argb_from_rgb
from PIL import Image from PIL import Image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.hypr import message from caelestia.utils.hypr import message
from caelestia.utils.material import get_colours_for_image from caelestia.utils.material import get_colours_for_image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.paths import ( from caelestia.utils.paths import (
compute_hash, compute_hash,
user_config_path, get_config,
wallpaper_link_path, wallpaper_link_path,
wallpaper_path_path, wallpaper_path_path,
wallpaper_thumbnail_path, wallpaper_thumbnail_path,
@@ -186,26 +185,23 @@ def set_wallpaper(wall: Path, no_smart: bool) -> None:
apply_colours(scheme.colours, scheme.mode) apply_colours(scheme.colours, scheme.mode)
# Run custom post-hook if configured # Run custom post-hook if configured
try: cfg = get_config().get("wallpaper", {})
cfg = json.loads(user_config_path.read_text()).get("wallpaper", {}) if post_hook := cfg.get("postHook"):
if post_hook := cfg.get("postHook"): subprocess.run(
subprocess.run( post_hook,
post_hook, shell=True,
shell=True, env={
env={ **os.environ,
**os.environ, "WALLPAPER_PATH": str(wall),
"WALLPAPER_PATH": str(wall), "SCHEME_NAME": scheme.name,
"SCHEME_NAME": scheme.name, "SCHEME_FLAVOUR": scheme.flavour,
"SCHEME_FLAVOUR": scheme.flavour, "SCHEME_MODE": scheme.mode,
"SCHEME_MODE": scheme.mode, "SCHEME_VARIANT": scheme.variant,
"SCHEME_VARIANT": scheme.variant, "SCHEME_COLOURS": json.dumps(scheme.colours),
"SCHEME_COLOURS": json.dumps(scheme.colours), "THUMBNAIL_PATH": str(thumb),
"THUMBNAIL_PATH": str(thumb), },
}, stderr=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, )
)
except (FileNotFoundError, json.JSONDecodeError):
pass
def set_random(args: Namespace) -> None: def set_random(args: Namespace) -> None: