diff --git a/bin/test_in_iso_env.sh b/bin/test_in_iso_env.sh new file mode 100755 index 00000000..d6072eb7 --- /dev/null +++ b/bin/test_in_iso_env.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env sh + +export HOME=/tmp/install-test +export XDG_CONFIG_HOME=$HOME/.config +export XDG_DATA_HOME=$HOME/.local/share +export XDG_STATE_HOME=$HOME/.local/state +export XDG_CACHE_HOME=$HOME/.cache + +"$@" diff --git a/completions/caelestia.fish b/completions/caelestia.fish index 80f177cb..0759525d 100644 --- a/completions/caelestia.fish +++ b/completions/caelestia.fish @@ -1,7 +1,7 @@ set -l seen '__fish_seen_subcommand_from' set -l has_opt '__fish_contains_opt' -set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer +set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install set -l not_seen "not $seen $commands" # Disable file completions @@ -20,6 +20,7 @@ complete -c caelestia -n $not_seen -a 'clipboard' -d 'Open clipboard history' complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities' complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper' complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer' +complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles' # Shell set -l commands mpris drawers wallpaper notifs @@ -126,3 +127,9 @@ complete -c caelestia -n "$seen emoji" -s 'f' -l 'fetch' -d 'Fetch emoji/glyph d complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode' complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode' complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window' + +# Install (component flags come from the manifest, so are not completed statically) +complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r +complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r +complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r +complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts' diff --git a/src/caelestia/__init__.py b/src/caelestia/__init__.py index d1eef1ee..f07cfe70 100644 --- a/src/caelestia/__init__.py +++ b/src/caelestia/__init__.py @@ -1,12 +1,16 @@ from caelestia.parser import parse_args +from caelestia.utils.io import log from caelestia.utils.version import print_version def main() -> None: - parser, args = parse_args() - if args.version: - print_version() - elif "cls" in args: - args.cls(args).run() - else: - parser.print_help() + try: + parser, args = parse_args() + if args.version: + print_version() + elif "cls" in args: + args.cls(args).run() + else: + parser.print_help() + except KeyboardInterrupt: + log("Exiting...") diff --git a/src/caelestia/parser.py b/src/caelestia/parser.py index aafa8525..2e24ed88 100644 --- a/src/caelestia/parser.py +++ b/src/caelestia/parser.py @@ -1,6 +1,21 @@ import argparse +import sys -from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper +from caelestia.subcommands import ( + clipboard, + emoji, + install, + record, + resizer, + scheme, + screenshot, + shell, + toggle, + wallpaper, +) +from caelestia.utils.dots.manifest import Manifest +from caelestia.utils.dots.source import DotsSource +from caelestia.utils.io import warn from caelestia.utils.paths import wallpapers_dir from caelestia.utils.scheme import get_scheme_names, scheme_variants from caelestia.utils.wallpaper import get_wallpaper @@ -128,4 +143,56 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]: resizer_parser.add_argument("height", nargs="?", help="height to resize to") resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)") + # Create parser for install opts + install_parser = command_parser.add_parser( + "install", + help="install the Caelestia dotfiles", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + install_parser.set_defaults(cls=install.Command) + install_parser.add_argument("--aur-helper", choices=["yay", "paru"], help="the AUR helper to use") + install_parser.add_argument( + "--enable-components", metavar="LIST", help="comma-separated list of components to enable" + ) + install_parser.add_argument( + "--disable-components", metavar="LIST", help="comma-separated list of components to disable" + ) + install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts") + _set_install_epilog(install_parser) + return parser, parser.parse_args() + + +def _set_install_epilog(install_parser: argparse.ArgumentParser) -> None: + """Add components if using install subcommand""" + + if len(sys.argv) > 1 and sys.argv[1] == "install": + manifest = _load_install_manifest() + if manifest is not None and manifest.components: + install_parser.epilog = _components_epilog(manifest) + + +def _load_install_manifest() -> Manifest | None: + source = DotsSource() + try: + source.ensure() + return source.manifest_at(source.remote_ref) + except Exception as e: + warn(f"failed to load manifest from dots repo ({e})\n", prefix=False) + return None + + +def _components_epilog(manifest: Manifest) -> str: + def e(*v: int) -> str: + return f"\033[{';'.join(str(c) for c in v)}m" + + def b(c: int) -> str: + return e(1, c) + + reset = e(0) + + width = max(len(name) for name in manifest.components) + lines = [f"{b(34)}available components (for --enable-components / --disable-components):{reset}"] + for name, comp in manifest.components.items(): + lines.append(f" {b(32)}{name:<{width}}{reset}\t{'(default)' if comp.default else '(off)'}") + return "\n".join(lines) diff --git a/src/caelestia/subcommands/install.py b/src/caelestia/subcommands/install.py new file mode 100644 index 00000000..a8bc8301 --- /dev/null +++ b/src/caelestia/subcommands/install.py @@ -0,0 +1,244 @@ +import os +import shutil +import subprocess +import textwrap +from argparse import Namespace +from pathlib import Path + +from caelestia.utils.dots.deployer import Deployer +from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError, expand, expand_dests +from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageInstaller +from caelestia.utils.dots.source import DotsSource, SourceError +from caelestia.utils.dots.state import DotsState +from caelestia.utils.io import PROMPT_COLOUR, confirm, disable_input, fatal, format_msg, info, log, pause, prompt, warn +from caelestia.utils.paths import ( + config_backup_dir, + config_dir, + dots_dir, +) + + +def _parse_list_arg(value: str | None) -> list[str] | None: + if value is None: + return None + return [item.strip() for item in value.split(",") if item.strip()] + + +class Command: + args: Namespace + + def __init__(self, args: Namespace) -> None: + self.args = args + + def run(self) -> None: + if self.args.noconfirm: + disable_input() + + self.print_greeting() + self.create_backup() + + source, tip, manifest = self.fetch_manifest() + self.deploy_configs(source, manifest) + helper, packages, local_packages = self.install_packages(source, manifest) + self.run_hooks(manifest) + + DotsState( + aur_helper=helper, + applied_rev=tip, + enabled_components=manifest.enabled_components, + packages=packages, + local_packages=local_packages, + ).save() + + self.print_done() + + def print_greeting(self) -> None: + print( + "\033[38;2;150;241;241m" # Caelestia colour + + textwrap.dedent( + r""" + ╭─────────────────────────────────────────────────╮ + │ ______ __ __ _ │ + │ / ____/___ ____ / /__ _____/ /_(_)___ _ │ + │ / / / __ `/ _ \/ / _ \/ ___/ __/ / __ `/ │ + │ / /___/ /_/ / __/ / __(__ ) /_/ / /_/ / │ + │ \____/\__,_/\___/_/\___/____/\__/_/\__,_/ │ + │ │ + ╰─────────────────────────────────────────────────╯ + """ + ) + + "\033[0m" + ) + info("Welcome to the Caelestia dotfiles installer!") + info("Here's a quick overview on what this command is going to do:") + info(" - Install dependencies") + info(" - Install config files") + info("The installer does NOT set up hardware/system level configs (e.g. drivers). Please do this yourself.") + pause() + print() + + def create_backup(self) -> None: + if config_dir.exists(): + if not confirm("Back up the config directory?", default=True): + return + + log(f"Creating a backup of {config_dir}...") + if config_backup_dir.exists(): + if not confirm("A backup already exists, overwrite?", default=False): + info("Not creating backup.") + return + + log("Deleting old backup...") + shutil.rmtree(config_backup_dir) + + shutil.copytree(config_dir, config_backup_dir, symlinks=True) + info(f"Created backup at {config_backup_dir}") + + def fetch_manifest(self) -> tuple[DotsSource, str, Manifest]: + print() + log("Fetching dots repo...") + source = DotsSource() + try: + source.ensure() + tip = source.checkout_tip() + except SourceError as e: + fatal(e) + + enable = _parse_list_arg(self.args.enable_components) + disable = _parse_list_arg(self.args.disable_components) + try: + manifest = source.manifest_at(tip) + manifest.resolve_components(enable=enable, disable=disable) + + if enable is None and disable is None: + self.prompt_optional_components(manifest) + except (SourceError, ManifestError, ComponentError) as e: + fatal(e) + + names = ", ".join(manifest.enabled_components) or "none" + info(f"Enabled components: {names}") + + return source, tip, manifest + + def prompt_optional_components(self, manifest: Manifest) -> None: + comp_arr = manifest.disabled_components + if not comp_arr: + return + + print(format_msg(PROMPT_COLOUR, True, "Components to enable?")) + max_idx_w = len(str(len(comp_arr))) + for i, comp in enumerate(comp_arr): + print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{comp}")) + print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)")) + + def _valid_v(v: str) -> int: + try: + i_v = int(v, base=10) - 1 # -1 to translate to 0 index + except ValueError: + raise ValueError(f'Given value "{v}" must be an integer.') + if i_v < 0 or i_v >= len(comp_arr): + raise ValueError(f'Given value "{v}" must be between 1 and {len(comp_arr)} inclusive.') + return i_v + + def _parse(ans: str) -> list[str] | None: + if ans in ("a", "all"): + return list(manifest.components) + if not ans: + return None + + enabled: list[str] = [] + for tok in ans.split(): + fr, sep, to = tok.partition("-") + if sep: + fr = _valid_v(fr) + to = _valid_v(to) + if fr > to: + raise ValueError(f'Given range "{tok}" must be lo-hi.') + enabled += comp_arr[fr : to + 1] + elif tok.startswith("^"): + t = _valid_v(tok[1:]) + enabled += comp_arr[:t] + comp_arr[t + 1 :] + else: + t = _valid_v(tok) + enabled.append(comp_arr[t]) + return list(set(enabled)) + + while True: + ans = prompt("", end="").lower().strip() + try: + enabled = _parse(ans) + except ValueError as e: + warn(f"invalid input. {e} Please try again.") + continue + + if enabled is not None: + manifest.resolve_components(enable=enabled) + return + + def deploy_configs(self, source: DotsSource, manifest: Manifest) -> None: + print() + log("Installing configs...") + deployer = Deployer() + for entry in manifest.enabled_entries(): + src = source.working_path(expand(entry.src)) + if not src.exists(): + warn(f"missing in source, skipping: {entry.src}") + continue + + dests = expand_dests(entry.dest) + if not dests: + warn(f"dest glob matched nothing, skipping: {entry.dest}") + continue + + for dest in dests: + deployer.place(src, Path(dest)) + info(f"{entry.src} -> {dest}") + + def install_packages(self, source: DotsSource, manifest: Manifest) -> tuple[str, list[str], dict[str, list[str]]]: + installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm) + + packages = manifest.enabled_packages() + if packages: + print() + log("Installing packages...") + installer.install(packages) + + local_packages = {} + local_dirs = manifest.enabled_local_packages() + if local_dirs: + print() + log("Building local packages...") + for path in local_dirs: + directory = source.working_path(path) + if not directory.is_dir(): + warn(f"missing in repo, skipping: {path}") + continue + + log(f"Building {path}...") + local_packages[path] = installer.build_install(directory) + + return getattr(installer, "helper", DEFAULT_AUR_HELPER), packages, local_packages + + def run_hooks(self, manifest: Manifest) -> None: + hooks = manifest.enabled_hooks("post_install") + if not hooks: + return + + print() + log("Running post-install hooks...") + env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)} + for hook in hooks: + info(f"Running hook: {hook}") + result = subprocess.run(hook, shell=True, env=env) + if result.returncode != 0: + warn(f"hook exited with {result.returncode}") + + def print_done(self) -> None: + print() + info("All done! Caelestia has been installed.") + info("A few things to finish up:") + info(" - A reboot is recommended for all changes take effect") + info(" - Edit `~/.config/caelestia/hypr-vars.conf` to set default apps, keybinds and much more") + info(" - Edit `~/.config/caelestia/hypr-user.conf` to set your monitor layout and other Hyprland configs") + info(" - Run `caelestia update` later to pull in the latest changes") + info("Enjoy! For support (or to just hang out), join our Discord server: https://discord.gg/BGDCFCmMBk") diff --git a/src/caelestia/subcommands/record.py b/src/caelestia/subcommands/record.py index 7a24ba74..372aee41 100644 --- a/src/caelestia/subcommands/record.py +++ b/src/caelestia/subcommands/record.py @@ -1,4 +1,3 @@ -import json import re import shutil import subprocess @@ -9,7 +8,7 @@ from pathlib import Path from caelestia.utils import hypr from caelestia.utils.notify import close_notification, notify -from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir, user_config_path +from caelestia.utils.paths import get_config, recording_notif_path, recording_path, recordings_dir RECORDER = "gpu-screen-recorder" @@ -65,12 +64,10 @@ class Command: if self.args.sound: args += ["-a", "default_output"] + config = get_config() try: - config = json.loads(user_config_path.read_text()) if "record" in config and "extraArgs" in config["record"]: args += config["record"]["extraArgs"] - except (json.JSONDecodeError, FileNotFoundError): - pass except TypeError as e: raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}") diff --git a/src/caelestia/subcommands/resizer.py b/src/caelestia/subcommands/resizer.py index fc126624..5edf3a35 100644 --- a/src/caelestia/subcommands/resizer.py +++ b/src/caelestia/subcommands/resizer.py @@ -7,8 +7,8 @@ from pathlib import Path from typing import Any, Dict, Optional from caelestia.utils import hypr -from caelestia.utils.logging import log_message -from caelestia.utils.paths import user_config_path +from caelestia.utils.io import error, fatal, info, log, warn +from caelestia.utils.paths import get_config class WindowRule: @@ -52,8 +52,8 @@ class Command: WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]), ] + config = get_config() try: - config = json.loads(user_config_path.read_text()) if "resizer" in config and "rules" in config["resizer"]: rules = [] for rule_config in config["resizer"]["rules"]: @@ -67,8 +67,8 @@ class Command: ) ) return rules - except (json.JSONDecodeError, KeyError): - log_message("ERROR: invalid config") + except KeyError: + warn("invalid config, falling back to default rules") except FileNotFoundError: pass @@ -188,12 +188,10 @@ class Command: command2 = self._make_move_cmd(int(move_x), int(move_y), address) hypr.batch(command1, command2) - log_message( - f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})" - ) + info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})") except Exception as e: - log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") + error(f"failed to apply PiP action to window 0x{window_id}: {e}") def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: dispatch_commands = [] @@ -214,10 +212,10 @@ class Command: try: hypr.batch(*dispatch_commands) - log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") + info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") return True except Exception as e: - log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") + error(f"failed to apply window actions for window 0x{window_id}: {e}") return False def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: @@ -236,7 +234,7 @@ class Command: if re.search(rule.name, window_title): return rule except re.error: - log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") + warn(f"invalid regex pattern in rule '{rule.name}'") return None @@ -258,7 +256,7 @@ class Command: window_id = window_id.lstrip(">") if not all(c in "0123456789abcdefABCDEF" for c in window_id): - log_message(f"ERROR: Invalid window ID format: {window_id}") + warn(f"invalid window ID format: {window_id}") return window_info = self._get_window_info(window_id) @@ -268,19 +266,19 @@ class Command: window_title = window_info.get("title", "") initial_title = window_info.get("initialTitle", "") - log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") + log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") rule = self._match_window_rule(window_title, initial_title) if rule: if self._is_rate_limited(window_id): - log_message(f"Rate limited: skipping window 0x{window_id}") + log(f"Rate limited: skipping window 0x{window_id}") return - log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") + info(f"Matched rule '{rule.name}' for window 0x{window_id}") self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) except (IndexError, ValueError) as e: - log_message(f"ERROR: Failed to parse window title event: {e}") + warn(f"failed to parse window title event: {e}") def _handle_open_event(self, event: str) -> None: try: @@ -296,22 +294,22 @@ class Command: window_id = window_id.lstrip(">") if not all(c in "0123456789abcdefABCDEF" for c in window_id): - log_message(f"ERROR: Invalid window ID format: {window_id}") + warn(f"invalid window ID format: {window_id}") return - log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") + log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") rule = self._match_window_rule(title, title) if rule: if self._is_rate_limited(window_id): - log_message(f"Rate limited: skipping window 0x{window_id}") + log(f"Rate limited: skipping window 0x{window_id}") return - log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") + info(f"Matched rule '{rule.name}' for new window 0x{window_id}") self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) except (IndexError, ValueError) as e: - log_message(f"ERROR: Failed to parse window open event: {e}") + warn(f"failed to parse window open event: {e}") def run(self) -> None: if self.args.daemon: @@ -324,7 +322,7 @@ class Command: ): self._run_active_mode() else: - print( + info( "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" ) @@ -333,28 +331,27 @@ class Command: try: active_window_result = hypr.message("activewindow") if not isinstance(active_window_result, dict) or not active_window_result.get("address"): - print("ERROR: No active window found") + error("no active window found") return address = active_window_result.get("address", "") if not isinstance(address, str) or not address.startswith("0x"): - print("ERROR: Invalid window address") + error("invalid window address") return window_id = address[2:] # Remove "0x" prefix window_title = active_window_result.get("title", "") if not active_window_result.get("floating", False): - print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") - print("Try making it floating first with: hyprctl dispatch togglefloating") + warn(f"window '{window_title}' is not floating; PiP only works on floating windows.") return - print(f"Applying PIP to active window: '{window_title}'") + info(f"Applying PiP to active window: '{window_title}'") self._apply_pip_action(window_id) - print("PIP applied successfully") + info("PiP applied successfully") except Exception as e: - print(f"ERROR: Failed to apply PIP to active window: {e}") + error(f"failed to apply PiP to active window: {e}") def _run_active_mode(self) -> None: try: @@ -371,10 +368,10 @@ class Command: matching_windows = self._find_matching_windows(temp_rule) if not matching_windows: - print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") + warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") return - print(f"Found {len(matching_windows)} matching window(s)") + info(f"Found {len(matching_windows)} matching window(s)") # Apply rule to all matching windows success_count = 0 @@ -382,41 +379,41 @@ class Command: window_id = window["address"][2:] # Remove "0x" prefix window_title = window.get("title", "") - print(f"Applying rule to window 0x{window_id}: '{window_title}'") + info(f"Applying rule to window 0x{window_id}: '{window_title}'") success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) if success: success_count += 1 - print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") + info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") except Exception as e: - print(f"ERROR: Failed to apply rule: {e}") + error(f"failed to apply rule: {e}") def _apply_to_active_window(self, temp_rule: WindowRule) -> None: """Apply rule only to the currently active window""" try: active_window_result = hypr.message("activewindow") if not isinstance(active_window_result, dict) or not active_window_result.get("address"): - print("ERROR: No active window found") + error("no active window found") return window_title = active_window_result.get("title", "") address = active_window_result.get("address", "") if not isinstance(address, str) or not address.startswith("0x"): - print("ERROR: Invalid window address") + error("invalid window address") return window_id = address[2:] # Remove "0x" prefix - print(f"Applying rule to active window 0x{window_id}: '{window_title}'") + info(f"Applying rule to active window 0x{window_id}: '{window_title}'") success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) if success: - print("Rule applied successfully") + info("Rule applied successfully") else: - print("Failed to apply rule") + error("failed to apply rule") except Exception as e: - print(f"ERROR: Failed to apply rule to active window: {e}") + error(f"failed to apply rule to active window: {e}") def _find_matching_windows(self, temp_rule: WindowRule) -> list: """Find all windows that match the given rule pattern""" @@ -445,7 +442,7 @@ class Command: try: matches = bool(re.search(temp_rule.name, window_title)) except re.error: - print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") + warn(f"invalid regex pattern '{temp_rule.name}'") return [] if matches: @@ -454,23 +451,22 @@ class Command: return matching_windows except Exception as e: - print(f"ERROR: Failed to find matching windows: {e}") + error(f"failed to find matching windows: {e}") return [] def _run_daemon(self) -> None: - log_message("Hyprland window resizer started") - log_message(f"Loaded {len(self.window_rules)} window rules") + info("Hyprland window resizer started") + info(f"Loaded {len(self.window_rules)} window rules") socket_path = Path(hypr.socket2_path) if not socket_path.exists(): - log_message(f"ERROR: Hyprland socket not found at {socket_path}") - return + fatal(f"Hyprland socket not found at {socket_path}") try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: sock.connect(hypr.socket2_path) - log_message("Connected to Hyprland socket, listening for events...") + info("Connected to Hyprland socket, listening for events...") while True: data = sock.recv(4096).decode() @@ -480,6 +476,6 @@ class Command: self._handle_window_event(line) except KeyboardInterrupt: - log_message("Resizer daemon stopped") + info("Resizer daemon stopped") except Exception as e: - log_message(f"ERROR: {e}") + error(str(e)) diff --git a/src/caelestia/subcommands/toggle.py b/src/caelestia/subcommands/toggle.py index 56565f37..f114bbfa 100644 --- a/src/caelestia/subcommands/toggle.py +++ b/src/caelestia/subcommands/toggle.py @@ -6,7 +6,7 @@ from collections import ChainMap from typing import Any, Callable, cast from caelestia.utils import hypr -from caelestia.utils.paths import user_config_path +from caelestia.utils.paths import get_config def is_subset(superset, subset): @@ -103,8 +103,8 @@ class Command: }, } try: - self.cfg = DeepChainMap(json.loads(user_config_path.read_text())["toggles"], self.cfg) - except (FileNotFoundError, json.JSONDecodeError, KeyError): + self.cfg = DeepChainMap(get_config()["toggles"], self.cfg) + except KeyError: pass def run(self) -> None: diff --git a/src/caelestia/utils/dots/deployer.py b/src/caelestia/utils/dots/deployer.py new file mode 100644 index 00000000..e8f766bd --- /dev/null +++ b/src/caelestia/utils/dots/deployer.py @@ -0,0 +1,58 @@ +import shutil +import tempfile +from pathlib import Path + + +class Deployer: + """Places files from the dots clone into their destinations.""" + + def place(self, src: Path, dest: Path) -> None: + """Place a whole entry (file or directory tree), replacing any existing dest.""" + + if src.is_dir(): + self.place_dir(src, dest) + else: + self.place_file(src, dest) + + def place_dir(self, src: Path, dest: Path) -> None: + """Place a directory tree recursively, overwriting any existing dest files.""" + + if dest.is_symlink() or dest.is_file(): + self.remove(dest) + + dest.mkdir(parents=True, exist_ok=True) + for path in src.rglob("*"): + if path.is_file(): + self.place_file(path, dest / path.relative_to(src)) + elif path.is_dir(): + (dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True) + + def place_file(self, src: Path, dest: Path) -> None: + """Atomically place a single file, replacing any existing dest.""" + + if dest.is_dir() and not dest.is_symlink(): + self.remove(dest) + + dest.parent.mkdir(parents=True, exist_ok=True) + f = tempfile.NamedTemporaryFile(dir=dest.parent, delete=False) + f.close() + try: + shutil.copyfile(src, f.name) + shutil.copymode(src, f.name) + Path(f.name).replace(dest) + except BaseException: + Path(f.name).unlink() + raise + + def write_new(self, src: Path, dest: Path) -> Path: + """Write the upstream version alongside dest as .new and return that path.""" + + new_path = dest.parent / f"{dest.name}.new" + self.place_file(src, new_path) + return new_path + + def remove(self, path: Path) -> None: + if path.is_symlink() or path.is_file(): + path.unlink() + elif path.is_dir(): + shutil.rmtree(path) diff --git a/src/caelestia/utils/dots/manifest.py b/src/caelestia/utils/dots/manifest.py new file mode 100644 index 00000000..f4acfb56 --- /dev/null +++ b/src/caelestia/utils/dots/manifest.py @@ -0,0 +1,217 @@ +import glob +import os +import re +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from string import Template +from typing import Any + +_XDG_DEFAULTS = { + "XDG_CONFIG_HOME": str(Path.home() / ".config"), + "XDG_DATA_HOME": str(Path.home() / ".local/share"), + "XDG_STATE_HOME": str(Path.home() / ".local/state"), + "XDG_CACHE_HOME": str(Path.home() / ".cache"), +} +_GLOB_MAGIC = re.compile(r"[*?[]") +_LOCAL_PREFIX = "local:" + + +class ManifestError(Exception): + """Raised when manifest.toml is malformed.""" + + +class ComponentError(Exception): + """Raised when component flags are invalid or contradictory.""" + + +def expand(text: str) -> Path: + """Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path.""" + + env = {**_XDG_DEFAULTS, **os.environ} + return Path(Template(text).safe_substitute(env)).expanduser() + + +def expand_dests(dest: str) -> list[Path]: + """Expand globs within a dest path. + + Globs from the start until the segment with the last glob so subdirs are + created if they didn't exist previously. + """ + + expanded = expand(dest) + if not _GLOB_MAGIC.search(str(expanded)): + return [expanded] + + parts = expanded.parts + glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part)) + pattern = str(Path(*parts[: glob_idx + 1])) + tail = parts[glob_idx + 1 :] + return [Path(match, *tail) for match in sorted(glob.glob(pattern))] + + +@dataclass(frozen=True) +class ManifestEntry: + src: str + dest: str + + +@dataclass(frozen=True) +class ManifestComponent: + name: str + default: bool = False + packages: list[str] = field(default_factory=list) + entries: list[ManifestEntry] = field(default_factory=list) + post_install: list[str] = field(default_factory=list) + post_update: list[str] = field(default_factory=list) + + +@dataclass +class _ManifestData: + enabled_comps: list[str] = field(default_factory=list) + disabled_comps: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class Manifest: + components: dict[str, ManifestComponent] = field(default_factory=dict) + packages: list[str] = field(default_factory=list) + post_install: list[str] = field(default_factory=list) + post_update: list[str] = field(default_factory=list) + _data: _ManifestData = field(default_factory=_ManifestData, init=False, repr=False) + + @property + def enabled_components(self) -> list[str]: + return self._data.enabled_comps + + @property + def disabled_components(self) -> list[str]: + return self._data.disabled_comps + + @staticmethod + def parse(text: str) -> "Manifest": + try: + raw = tomllib.loads(text) + except tomllib.TOMLDecodeError as e: + raise ManifestError(f"invalid TOML: {e}") from e + + post_install = _validate_str_list(raw.get("post_install", []), "post_install") + post_update = _validate_str_list(raw.get("post_update", []), "post_update") + + packages = _validate_str_list(raw.get("packages", []), "packages") + + components = {} + for comp in raw.get("components", []): + parsed = _parse_component(comp) + components[parsed.name] = parsed + + return Manifest( + components=components, + packages=packages, + post_install=post_install, + post_update=post_update, + ) + + def resolve_components( + self, + enable: list[str] | None = None, + disable: list[str] | None = None, + ) -> None: + """Resolves enabled/disabled components. This MUST be called before calling any other method.""" + + enable_set = set(enable or []) + disable_set = set(disable or []) + known = set(self.components) + + for name in enable_set | disable_set: + if name not in known: + raise ComponentError(f"unknown component: {name}") + + conflict = enable_set & disable_set + if conflict: + raise ComponentError(f"component(s) both enabled and disabled: {', '.join(sorted(conflict))}") + + enabled = {name for name, comp in self.components.items() if comp.default} + enabled |= enable_set + enabled -= disable_set + + self._data.enabled_comps.clear() + self._data.disabled_comps.clear() + for name in self.components: + if name in enabled: + self._data.enabled_comps.append(name) + else: + self._data.disabled_comps.append(name) + + def enabled_entries(self) -> list[ManifestEntry]: + """The entries of every enabled component.""" + + entries: list[ManifestEntry] = [] + for name in self._data.enabled_comps: + entries.extend(self.components[name].entries) + return entries + + def enabled_hooks(self, kind: str) -> list[str]: + """Global + enabled components' hooks of the given kind.""" + + hooks = list(getattr(self, kind)) + for name in self._data.enabled_comps: + hooks.extend(getattr(self.components[name], kind)) + return hooks + + def enabled_packages(self) -> list[str]: + """Repo/AUR packages to install.""" + return [p for p in self._all_packages() if not p.startswith(_LOCAL_PREFIX)] + + def enabled_local_packages(self) -> list[str]: + """Local PKGBUILD dirs to build. + + Local packages are determined by a local: prefix and are + relative dirs instead of package names. + """ + return [p[len(_LOCAL_PREFIX) :] for p in self._all_packages() if p.startswith(_LOCAL_PREFIX)] + + def _all_packages(self) -> list[str]: + """The manifest's top-level packages plus enabled components', in manifest order. + + Top-level packages come first, then each enabled component's packages in + component order. Only the first occurrence of each package is kept. + """ + + seen: set[str] = set() + ordered: list[str] = [] + for pkg in (*self.packages, *(p for c in self._data.enabled_comps for p in self.components[c].packages)): + if pkg not in seen: + seen.add(pkg) + ordered.append(pkg) + return ordered + + +def _require_key(d: dict[str, Any], key: str, ctx: str) -> Any: + if key not in d: + raise ManifestError(f"{ctx}: missing required key '{key}'") + return d[key] + + +def _validate_str_list(value: Any, ctx: str) -> list[str]: + if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + raise ManifestError(f"{ctx}: expected a list of strings") + return value + + +def _parse_entry(d: Any) -> ManifestEntry: + if not isinstance(d, dict): + raise ManifestError("entry: expected a table") + return ManifestEntry(src=_require_key(d, "src", "entry"), dest=_require_key(d, "dest", "entry")) + + +def _parse_component(d: dict[str, Any]) -> ManifestComponent: + name = _require_key(d, "name", "component") + return ManifestComponent( + name=name, + default=bool(d.get("default", False)), + packages=_validate_str_list(d.get("packages", []), f"component '{name}' packages"), + entries=[_parse_entry(e) for e in d.get("entries", [])], + post_install=_validate_str_list(d.get("post_install", []), f"component '{name}' post_install"), + post_update=_validate_str_list(d.get("post_update", []), f"component '{name}' post_update"), + ) diff --git a/src/caelestia/utils/dots/packages.py b/src/caelestia/utils/dots/packages.py new file mode 100644 index 00000000..d967c423 --- /dev/null +++ b/src/caelestia/utils/dots/packages.py @@ -0,0 +1,130 @@ +import os +import shutil +import subprocess +import tempfile +from abc import ABC, abstractmethod +from pathlib import Path + +from caelestia.utils.io import fatal, info + +DEFAULT_AUR_HELPER = "paru" +AUR_HELPERS = DEFAULT_AUR_HELPER, "yay" + + +def _install_aur_helper(helper: str, noconfirm: bool = False) -> None: + pacman_cmd = ["sudo", "pacman", "-S", "--needed", "git", "base-devel"] + if noconfirm: + pacman_cmd.append("--noconfirm") + subprocess.run(pacman_cmd, check=True) + + repo_url = f"https://aur.archlinux.org/{helper}.git" + with tempfile.TemporaryDirectory() as repo_dir: + subprocess.run(["git", "clone", repo_url, repo_dir], check=True) + + makepkg_cmd = ["makepkg", "-si"] + if noconfirm: + makepkg_cmd.append("--noconfirm") + subprocess.run(makepkg_cmd, cwd=repo_dir, check=True) + + if helper == "yay": + subprocess.run(["yay", "-Y", "--gendb"], check=True) + subprocess.run(["yay", "-Y", "--devel", "--save"], check=True) + elif helper == "paru": + subprocess.run(["paru", "--gendb"], check=True) + + +class PackageInstaller(ABC): + @staticmethod + def get(helper: str | None = None, noconfirm: bool = False) -> "PackageInstaller": + """Pick a package installer: the requested/detected AUR helper on Arch, else a no-op.""" + + # Not on Arch, can't install packages + if shutil.which("pacman") is None: + return NoopInstaller() + + # Explicitly given + if helper: + if not shutil.which(helper): + if helper not in AUR_HELPERS: + fatal(f"given AUR helper {helper} is not installed and is unable to be installed automatically.") + + info(f"Given AUR helper not installed. Installing {helper}...") + _install_aur_helper(helper, noconfirm) + return ArchInstaller(helper, noconfirm) + + # Not given, find installed one + for candidate in AUR_HELPERS: + if shutil.which(candidate): + return ArchInstaller(candidate, noconfirm) + + info(f"No AUR helper found. Installing {DEFAULT_AUR_HELPER}...") + _install_aur_helper(DEFAULT_AUR_HELPER, noconfirm) + return ArchInstaller(DEFAULT_AUR_HELPER, noconfirm) + + # --- Abstract methods --- + + @abstractmethod + def install(self, packages: list[str]) -> None: ... + + @abstractmethod + def remove(self, packages: list[str]) -> None: ... + + @abstractmethod + def build_install(self, directory: Path) -> list[str]: + """Build and install the PKGBUILD in `directory`, returning the installed package names.""" + + +class NoopInstaller(PackageInstaller): + """Used off Arch, where the dots' packages are not available via pacman/AUR.""" + + def install(self, packages: list[str]) -> None: + if packages: + info(f"Skipping package install (not on Arch): {', '.join(packages)}") + + def remove(self, packages: list[str]) -> None: + if packages: + info(f"Skipping package removal (not on Arch): {', '.join(packages)}") + + def build_install(self, directory: Path) -> list[str]: + info(f"Skipping local package build (not on Arch): {directory}") + return [] + + +class ArchInstaller(PackageInstaller): + def __init__(self, helper: str, noconfirm: bool = False) -> None: + self.helper = helper + self.flags = ["--noconfirm"] if noconfirm else [] + + def install(self, packages: list[str], extra_flags: list[str] | None = None) -> None: + if not packages: + return + subprocess.run([self.helper, "-S", "--needed", *self.flags, *(extra_flags or []), *packages], check=True) + + def remove(self, packages: list[str]) -> None: + if not packages: + return + subprocess.run([self.helper, "-Rns", *self.flags, *packages], check=True) + + def build_install(self, directory: Path) -> list[str]: + srcinfo = subprocess.check_output(["makepkg", "--printsrcinfo"], cwd=directory, text=True) + names = [] + depends = [] + for line in srcinfo.splitlines(): + key, sep, value = line.partition("=") + if not sep: + continue + + key = key.strip() + if key == "pkgname": + names.append(value.strip()) + elif key == "depends": + depends.append(value.strip()) + + self.install(depends, extra_flags=["--asdeps"]) + + # Stop makepkg from resetting sudo + env = {**os.environ, "PACMAN_AUTH": "sudo"} + # -f = force, -s = sync deps, -i = install + subprocess.run(["makepkg", "-fsi", *self.flags], cwd=directory, env=env, check=True) + + return names diff --git a/src/caelestia/utils/dots/source.py b/src/caelestia/utils/dots/source.py new file mode 100644 index 00000000..d4f79fd8 --- /dev/null +++ b/src/caelestia/utils/dots/source.py @@ -0,0 +1,107 @@ +import shutil +import subprocess +from pathlib import Path + +from caelestia.utils.dots.manifest import Manifest +from caelestia.utils.paths import dots_dir, get_config + + +class SourceError(Exception): + """Raised when a git operation against the dots clone fails.""" + + +class DotsSource: + _fetched_source: bool = False + + def __init__(self) -> None: + cfg = get_config().get("dots", {}) + self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git") + self.branch = cfg.get("branch", "main") + + @property + def remote_ref(self) -> str: + return f"origin/{self.branch}" + + def exists(self) -> bool: + return (dots_dir / ".git").is_dir() + + def working_path(self, relpath: str | Path) -> Path: + """Get a Path relative to the dots dir.""" + return dots_dir / relpath + + def ensure(self) -> None: + """Clone the repo if absent, otherwise fetch the latest refs. + + If the configured url changed, the stale clone is removed and re-cloned + from the new source. + """ + + if self.exists(): + if self.current_url() == self.url: + if DotsSource._fetched_source: + return + + self._git("fetch", "--prune", "origin", self.branch) + DotsSource._fetched_source = True + return + shutil.rmtree(dots_dir) + + dots_dir.parent.mkdir(parents=True, exist_ok=True) + self._run("git", "clone", "--branch", self.branch, self.url, str(dots_dir)) + DotsSource._fetched_source = True + + def current_url(self) -> str: + return self._git("remote", "get-url", "origin").strip() + + def checkout_tip(self) -> str: + """Reset the working tree to the fetched tip and return its commit hash.""" + + self._git("reset", "--hard", self.remote_ref) + return self.tip_rev() + + def tip_rev(self) -> str: + return self._git("rev-parse", self.remote_ref).strip() + + def changed_files(self, base: str, head: str) -> list[str]: + """Repo-relative paths that differ between two revisions.""" + + out = self._git("diff", "--name-only", base, head) + return [line for line in out.splitlines() if line] + + def clean(self) -> None: + """Remove all untracked files in the git repo.""" + self._git("clean", "-fdx") + + # --- Accessors --- + + def manifest_at(self, ref: str) -> Manifest: + return Manifest.parse(self.text_at(ref, "manifest.toml")) + + def text_at(self, ref: str, relpath: str) -> str: + return self._git("show", f"{ref}:{relpath}") + + def blob_at(self, ref: str, relpath: str) -> bytes: + return self._git_bytes("show", f"{ref}:{relpath}") + + def files_at(self, ref: str, relpath: str) -> list[str]: + """Repo-relative paths of all files under relpath at ref (the path itself if it is a file).""" + + out = self._git("ls-tree", "-r", "--name-only", ref, "--", relpath) + return [line for line in out.splitlines() if line] + + # --- Helpers --- + + def _git(self, *args: str) -> str: + return self._run("git", "-C", str(dots_dir), *args) + + def _git_bytes(self, *args: str) -> bytes: + result = subprocess.run(["git", "-C", str(dots_dir), *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if result.returncode != 0: + raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed") + return result.stdout + + def _run(self, *cmd: str) -> str: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + raise SourceError(result.stderr.strip() or f"{' '.join(cmd)} failed") + return result.stdout diff --git a/src/caelestia/utils/dots/state.py b/src/caelestia/utils/dots/state.py new file mode 100644 index 00000000..d47f3c96 --- /dev/null +++ b/src/caelestia/utils/dots/state.py @@ -0,0 +1,52 @@ +import json +from dataclasses import dataclass, field + +from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER +from caelestia.utils.io import warn +from caelestia.utils.paths import atomic_dump, dots_state_path + + +@dataclass +class DotsState: + # The AUR helper selected selected at install time + aur_helper: str = "paru" + + # The git rev of currently applied dots version + applied_rev: str | None = None + + # The currently enabled components + enabled_components: list[str] = field(default_factory=list) + + # Previously installed packages/local packages + packages: list[str] = field(default_factory=list) + local_packages: dict[str, list[str]] = field(default_factory=dict) + + @staticmethod + def load() -> "DotsState": + try: + data = json.loads(dots_state_path.read_text()) + except FileNotFoundError: + return DotsState() + except json.JSONDecodeError: + warn("failed to parse current dots state.") + return DotsState() + + return DotsState( + aur_helper=data.get("aur_helper", DEFAULT_AUR_HELPER), + applied_rev=data.get("applied_rev"), + enabled_components=data.get("enabled_components", []), + packages=data.get("packages", []), + local_packages=data.get("local_packages", {}), + ) + + def save(self) -> None: + atomic_dump( + dots_state_path, + { + "aur_helper": self.aur_helper, + "applied_rev": self.applied_rev, + "enabled_components": self.enabled_components, + "packages": self.packages, + "local_packages": self.local_packages, + }, + ) diff --git a/src/caelestia/utils/io.py b/src/caelestia/utils/io.py new file mode 100644 index 00000000..020ca45d --- /dev/null +++ b/src/caelestia/utils/io.py @@ -0,0 +1,88 @@ +import sys +from typing import Never + +LOG_COLOUR: int = 2 +INFO_COLOUR: int = 0 +PROMPT_COLOUR: int = 36 +WARNING_COLOUR: int = 33 +ERROR_COLOUR: int = 31 + +_disable_input: bool = False + + +def disable_input() -> None: + global _disable_input + _disable_input = True + + +def log_exception(func): + """Log exceptions to stderr instead of raising. + + Used by the `apply_()` functions so that an exception, when applying + a theme, does not prevent the other themes from being applied. + """ + + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except Exception as e: + error(f'exception during "{func.__name__}()": {str(e)}') + + return wrapper + + +def format_msg(colour: int, prefix: bool, msg: str) -> str: + return f"\033[{colour}m{':: ' if prefix else ''}{msg}\033[0m" + + +def log(msg: str, prefix: bool = True) -> None: + print(format_msg(LOG_COLOUR, prefix, msg)) + + +def info(msg: str, prefix: bool = True) -> None: + print(format_msg(INFO_COLOUR, prefix, msg)) + + +def warn(msg: str, prefix: bool = True) -> None: + print(format_msg(WARNING_COLOUR, prefix, f"Warning: {msg}")) + + +def error(err: str | Exception, prefix: bool = True) -> None: + print(format_msg(ERROR_COLOUR, prefix, f"Error: {err}"), file=sys.stderr) + + +def fatal(err: str | Exception, prefix: bool = True) -> Never: + print(format_msg(ERROR_COLOUR, prefix, f"Fatal: {err}"), file=sys.stderr) + sys.exit(1) + + +def _input(prompt: str) -> str: + if _disable_input: + print(prompt, end="") + return "" + + try: + return input(prompt) + except (KeyboardInterrupt, EOFError): + print() + raise KeyboardInterrupt() + + +def prompt(msg: str, prefix: bool = True, end: str = " ") -> str: + return _input(format_msg(PROMPT_COLOUR, prefix, msg) + end) + + +def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool: + suffix = " [Y/n]" if default else " [y/N]" + answer = prompt(msg + suffix, prefix=prefix).strip().lower() + if not answer: + return default + return answer in ("y", "yes") + + +def pause() -> None: + if _disable_input: + return + + _input("\n\033[2m\033[3m(Ctrl+C to exit, enter to continue)\033[0m") + print("\033[1A\r\033[2K\033[1A\r\033[2K", end="") # Clear pause prompt diff --git a/src/caelestia/utils/logging.py b/src/caelestia/utils/logging.py deleted file mode 100644 index 228936e5..00000000 --- a/src/caelestia/utils/logging.py +++ /dev/null @@ -1,22 +0,0 @@ -from time import strftime - - -def log_message(message: str) -> None: - timestamp = strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}") - - -def log_exception(func): - """Log exceptions to stdout instead of raising - - Used by the `apply_()` functions so that an exception, when applying - a theme, does not prevent the other themes from being applied. - """ - - def wrapper(*args, **kwargs): - try: - func(*args, **kwargs) - except Exception as e: - log_message(f'Error during execution of "{func.__name__}()": {str(e)}') - - return wrapper diff --git a/src/caelestia/utils/paths.py b/src/caelestia/utils/paths.py index 3223b900..a4b50c2d 100644 --- a/src/caelestia/utils/paths.py +++ b/src/caelestia/utils/paths.py @@ -1,11 +1,12 @@ import hashlib import json import os -import shutil import tempfile from pathlib import Path from typing import Any +from caelestia.utils.io import warn + config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share")) state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state")) @@ -24,6 +25,10 @@ templates_dir: Path = cli_data_dir / "templates" user_templates_dir: Path = c_config_dir / "templates" theme_dir: Path = c_state_dir / "theme" +config_backup_dir: Path = config_dir.parent / f"{config_dir.name}.bak" +dots_dir: Path = c_state_dir / "dots" +dots_state_path: Path = c_state_dir / "dots-state.json" + scheme_path: Path = c_state_dir / "scheme.json" scheme_data_dir: Path = cli_data_dir / "schemes" scheme_cache_dir: Path = c_cache_dir / "schemes" @@ -52,8 +57,29 @@ def compute_hash(path: Path | str) -> str: return sha.hexdigest() +def atomic_write(path: Path, content: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + f = tempfile.NamedTemporaryFile("w", dir=path.parent, delete=False) + try: + with f: + f.write(content) + f.flush() + os.fsync(f.fileno()) + os.replace(f.name, path) + except BaseException: + os.unlink(f.name) + raise + + def atomic_dump(path: Path, content: dict[str, Any]) -> None: - with tempfile.NamedTemporaryFile("w") as f: - json.dump(content, f) - f.flush() - shutil.move(f.name, path) + atomic_write(path, json.dumps(content)) + + +def get_config() -> dict[str, Any]: + try: + return json.loads(user_config_path.read_text()) + except json.JSONDecodeError: + warn("failed to parse config, invalid JSON") + except FileNotFoundError: + pass + return {} diff --git a/src/caelestia/utils/theme.py b/src/caelestia/utils/theme.py index deec33a0..472cfb8e 100644 --- a/src/caelestia/utils/theme.py +++ b/src/caelestia/utils/theme.py @@ -8,18 +8,19 @@ import tempfile from pathlib import Path from caelestia.utils.colour import get_dynamic_colours -from caelestia.utils.logging import log_exception +from caelestia.utils.hypr import is_lua_config +from caelestia.utils.io import log_exception from caelestia.utils.paths import ( + atomic_write, c_state_dir, config_dir, data_dir, + get_config, templates_dir, theme_dir, - user_config_path, user_templates_dir, ) from caelestia.utils.scheme import get_scheme -from caelestia.utils.hypr import is_lua_config def gen_conf(colours: dict[str, str]) -> str: @@ -28,6 +29,7 @@ def gen_conf(colours: dict[str, str]) -> str: conf += f"${name} = {colour}\n" return conf + def gen_lua(colours: dict[str, str]) -> str: lua = "return {\n" for name, colour in colours.items(): @@ -35,6 +37,7 @@ def gen_lua(colours: dict[str, str]) -> str: lua += "}" return lua + def gen_scss(colours: dict[str, str]) -> str: scss = "" for name, colour in colours.items(): @@ -117,15 +120,6 @@ def gen_sequences(colours: dict[str, str]) -> str: ) -def write_file(path: Path, content: str) -> None: - path.parent.mkdir(parents=True, exist_ok=True) - - with tempfile.NamedTemporaryFile("w") as f: - f.write(content) - f.flush() - shutil.move(f.name, path) - - @log_exception def apply_terms(sequences: str) -> None: state = c_state_dir / "sequences.txt" @@ -152,57 +146,55 @@ def apply_terms(sequences: str) -> None: @log_exception def apply_hypr(conf: str) -> None: ext = "lua" if is_lua_config() else "conf" - write_file(config_dir / f"hypr/scheme/current.{ext}", conf) + atomic_write(config_dir / f"hypr/scheme/current.{ext}", conf) @log_exception def apply_discord(scss: str) -> None: - import tempfile - with tempfile.TemporaryDirectory("w") as tmp_dir: (Path(tmp_dir) / "_colours.scss").write_text(scss) conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True) for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord": - write_file(config_dir / client / "themes/caelestia.theme.css", conf) + atomic_write(config_dir / client / "themes/caelestia.theme.css", conf) @log_exception def apply_pandora(colours: dict[str, str], mode: str) -> None: template = gen_replace(colours, templates_dir / "pandora.json", hash=True) template = template.replace("{{ $mode }}", mode) - write_file(data_dir / "PandoraLauncher/themes/caelestia.json", template) + atomic_write(data_dir / "PandoraLauncher/themes/caelestia.json", template) @log_exception def apply_spicetify(colours: dict[str, str], mode: str) -> None: template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini") - write_file(config_dir / "spicetify/Themes/caelestia/color.ini", template) + atomic_write(config_dir / "spicetify/Themes/caelestia/color.ini", template) @log_exception def apply_fuzzel(colours: dict[str, str]) -> None: template = gen_replace(colours, templates_dir / "fuzzel.ini") - write_file(config_dir / "fuzzel/fuzzel.ini", template) + atomic_write(config_dir / "fuzzel/fuzzel.ini", template) @log_exception def apply_btop(colours: dict[str, str]) -> None: template = gen_replace(colours, templates_dir / "btop.theme", hash=True) - write_file(config_dir / "btop/themes/caelestia.theme", template) + atomic_write(config_dir / "btop/themes/caelestia.theme", template) subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL) @log_exception def apply_nvtop(colours: dict[str, str]) -> None: template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True) - write_file(config_dir / "nvtop/nvtop.colors", template) + atomic_write(config_dir / "nvtop/nvtop.colors", template) @log_exception def apply_htop(colours: dict[str, str]) -> None: template = gen_replace(colours, templates_dir / "htop.theme", hash=True) - write_file(config_dir / "htop/htoprc", template) + atomic_write(config_dir / "htop/htoprc", template) subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL) @@ -318,8 +310,8 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None) for gtk_version in ["gtk-3.0", "gtk-4.0"]: gtk_config_dir = config_dir / gtk_version - write_file(gtk_config_dir / "gtk.css", gtk_template) - write_file(gtk_config_dir / "thunar.css", thunar_template) + atomic_write(gtk_config_dir / "gtk.css", gtk_template) + atomic_write(gtk_config_dir / "thunar.css", thunar_template) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"]) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"]) @@ -332,13 +324,13 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None) @log_exception def apply_qt(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None: colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True) - write_file(config_dir / "qtengine/caelestia.colors", colours) + atomic_write(config_dir / "qtengine/caelestia.colors", colours) config = (templates_dir / "qtengine.json").read_text() config = config.replace("{{ $mode }}", mode.capitalize()) if icon_theme is not None: config = config.replace(f'"iconTheme": "Papirus-{mode.capitalize()}"', f'"iconTheme": "{icon_theme}"') - write_file(config_dir / "qtengine/config.json", config) + atomic_write(config_dir / "qtengine/config.json", config) @log_exception @@ -347,7 +339,7 @@ def apply_warp(colours: dict[str, str], mode: str) -> None: template = gen_replace(colours, templates_dir / "warp.yaml", hash=True) template = template.replace("{{ $warp_mode }}", warp_mode) - write_file(data_dir / "warp-terminal/themes/caelestia.yaml", template) + atomic_write(data_dir / "warp-terminal/themes/caelestia.yaml", template) @log_exception @@ -369,7 +361,7 @@ def apply_chromium(colours: dict[str, str]) -> None: print(f"Unable to create {policy_dir} directory") continue - # Use tee instead of write_file cause we need sudo + # Use tee instead of atomic_write cause we need sudo subprocess.run( ["sudo", "-n", "tee", str(policy_dir / "caelestia.json")], input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}), @@ -392,13 +384,13 @@ def apply_zed(colours: dict[str, str], mode: str) -> None: theme_path.unlink() content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode) - write_file(theme_path, content) + atomic_write(theme_path, content) @log_exception def apply_cava(colours: dict[str, str]) -> None: template = gen_replace(colours, templates_dir / "cava.conf", hash=True) - write_file(config_dir / "cava/config", template) + atomic_write(config_dir / "cava/config", template) subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL) @@ -410,7 +402,7 @@ def apply_user_templates(colours: dict[str, str], mode: str) -> None: for file in user_templates_dir.iterdir(): if file.is_file(): content = gen_replace_dynamic(colours, file, mode) - write_file(theme_dir / file.name, content) + atomic_write(theme_dir / file.name, content) def apply_colours(colours: dict[str, str], mode: str) -> None: @@ -425,10 +417,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None: except BlockingIOError: return - try: - cfg = json.loads(user_config_path.read_text())["theme"] - except (FileNotFoundError, json.JSONDecodeError, KeyError): - cfg = {} + cfg = get_config().get("theme", {}) def check(key: str) -> bool: return cfg[key] if key in cfg else True diff --git a/src/caelestia/utils/wallpaper.py b/src/caelestia/utils/wallpaper.py index 227a8fd5..013b39a7 100644 --- a/src/caelestia/utils/wallpaper.py +++ b/src/caelestia/utils/wallpaper.py @@ -2,7 +2,6 @@ import json import os import random import subprocess - from argparse import Namespace from pathlib import Path from typing import cast @@ -11,12 +10,12 @@ from materialyoucolor.hct import Hct from materialyoucolor.utils.color_utils import argb_from_rgb from PIL import Image +from caelestia.utils.colourfulness import get_variant from caelestia.utils.hypr import message from caelestia.utils.material import get_colours_for_image -from caelestia.utils.colourfulness import get_variant from caelestia.utils.paths import ( compute_hash, - user_config_path, + get_config, wallpaper_link_path, wallpaper_path_path, wallpaper_thumbnail_path, @@ -186,26 +185,23 @@ def set_wallpaper(wall: Path, no_smart: bool) -> None: apply_colours(scheme.colours, scheme.mode) # Run custom post-hook if configured - try: - cfg = json.loads(user_config_path.read_text()).get("wallpaper", {}) - if post_hook := cfg.get("postHook"): - subprocess.run( - post_hook, - shell=True, - env={ - **os.environ, - "WALLPAPER_PATH": str(wall), - "SCHEME_NAME": scheme.name, - "SCHEME_FLAVOUR": scheme.flavour, - "SCHEME_MODE": scheme.mode, - "SCHEME_VARIANT": scheme.variant, - "SCHEME_COLOURS": json.dumps(scheme.colours), - "THUMBNAIL_PATH": str(thumb), - }, - stderr=subprocess.DEVNULL, - ) - except (FileNotFoundError, json.JSONDecodeError): - pass + cfg = get_config().get("wallpaper", {}) + if post_hook := cfg.get("postHook"): + subprocess.run( + post_hook, + shell=True, + env={ + **os.environ, + "WALLPAPER_PATH": str(wall), + "SCHEME_NAME": scheme.name, + "SCHEME_FLAVOUR": scheme.flavour, + "SCHEME_MODE": scheme.mode, + "SCHEME_VARIANT": scheme.variant, + "SCHEME_COLOURS": json.dumps(scheme.colours), + "THUMBNAIL_PATH": str(thumb), + }, + stderr=subprocess.DEVNULL, + ) def set_random(args: Namespace) -> None: