diff --git a/completions/caelestia.fish b/completions/caelestia.fish index 0759525..32273e7 100644 --- a/completions/caelestia.fish +++ b/completions/caelestia.fish @@ -1,7 +1,7 @@ set -l seen '__fish_seen_subcommand_from' set -l has_opt '__fish_contains_opt' -set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install +set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install update set -l not_seen "not $seen $commands" # Disable file completions @@ -21,6 +21,7 @@ complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities' complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper' complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer' complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles' +complete -c caelestia -n $not_seen -a 'update' -d 'Update the Caelestia dotfiles' # Shell set -l commands mpris drawers wallpaper notifs @@ -133,3 +134,7 @@ complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to u complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts' + +# Update +complete -c caelestia -n "$seen update" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r +complete -c caelestia -n "$seen update" -l 'noconfirm' -d 'Use defaults for all prompts' diff --git a/src/caelestia/parser.py b/src/caelestia/parser.py index 2e24ed8..d6d8f45 100644 --- a/src/caelestia/parser.py +++ b/src/caelestia/parser.py @@ -11,9 +11,11 @@ from caelestia.subcommands import ( screenshot, shell, toggle, + update, wallpaper, ) from caelestia.utils.dots.manifest import Manifest +from caelestia.utils.dots.packages import AUR_HELPERS from caelestia.utils.dots.source import DotsSource from caelestia.utils.io import warn from caelestia.utils.paths import wallpapers_dir @@ -150,7 +152,7 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]: formatter_class=argparse.RawDescriptionHelpFormatter, ) install_parser.set_defaults(cls=install.Command) - install_parser.add_argument("--aur-helper", choices=["yay", "paru"], help="the AUR helper to use") + install_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use") install_parser.add_argument( "--enable-components", metavar="LIST", help="comma-separated list of components to enable" ) @@ -160,6 +162,12 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]: install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts") _set_install_epilog(install_parser) + # Create parser for update opts + update_parser = command_parser.add_parser("update", help="update the Caelestia dotfiles") + update_parser.set_defaults(cls=update.Command) + update_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use") + update_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts") + return parser, parser.parse_args() diff --git a/src/caelestia/subcommands/install.py b/src/caelestia/subcommands/install.py index a8bc830..6364b20 100644 --- a/src/caelestia/subcommands/install.py +++ b/src/caelestia/subcommands/install.py @@ -1,20 +1,18 @@ -import os import shutil -import subprocess import textwrap from argparse import Namespace from pathlib import Path from caelestia.utils.dots.deployer import Deployer -from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError, expand, expand_dests +from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError +from caelestia.utils.dots.misc import build_local_packages, run_hooks from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageInstaller from caelestia.utils.dots.source import DotsSource, SourceError from caelestia.utils.dots.state import DotsState -from caelestia.utils.io import PROMPT_COLOUR, confirm, disable_input, fatal, format_msg, info, log, pause, prompt, warn +from caelestia.utils.io import confirm, disable_input, fatal, info, log, pause, prompt_selection, warn from caelestia.utils.paths import ( config_backup_dir, config_dir, - dots_dir, ) @@ -38,9 +36,9 @@ class Command: self.create_backup() source, tip, manifest = self.fetch_manifest() - self.deploy_configs(source, manifest) + deployed = self.deploy_configs(source, manifest) helper, packages, local_packages = self.install_packages(source, manifest) - self.run_hooks(manifest) + run_hooks(manifest, "post_install") DotsState( aur_helper=helper, @@ -48,6 +46,7 @@ class Command: enabled_components=manifest.enabled_components, packages=packages, local_packages=local_packages, + deployed_files=deployed, ).save() self.print_done() @@ -108,10 +107,14 @@ class Command: disable = _parse_list_arg(self.args.disable_components) try: manifest = source.manifest_at(tip) - manifest.resolve_components(enable=enable, disable=disable) + # No flags given, prompt user for non-default components if enable is None and disable is None: - self.prompt_optional_components(manifest) + optional = [name for name, comp in manifest.components.items() if not comp.default] + if optional: + enable = prompt_selection(optional, "Components to enable?") + + manifest.resolve_components(enable=enable, disable=disable) except (SourceError, ManifestError, ComponentError) as e: fatal(e) @@ -120,72 +123,17 @@ class Command: return source, tip, manifest - def prompt_optional_components(self, manifest: Manifest) -> None: - comp_arr = manifest.disabled_components - if not comp_arr: - return - - print(format_msg(PROMPT_COLOUR, True, "Components to enable?")) - max_idx_w = len(str(len(comp_arr))) - for i, comp in enumerate(comp_arr): - print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{comp}")) - print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)")) - - def _valid_v(v: str) -> int: - try: - i_v = int(v, base=10) - 1 # -1 to translate to 0 index - except ValueError: - raise ValueError(f'Given value "{v}" must be an integer.') - if i_v < 0 or i_v >= len(comp_arr): - raise ValueError(f'Given value "{v}" must be between 1 and {len(comp_arr)} inclusive.') - return i_v - - def _parse(ans: str) -> list[str] | None: - if ans in ("a", "all"): - return list(manifest.components) - if not ans: - return None - - enabled: list[str] = [] - for tok in ans.split(): - fr, sep, to = tok.partition("-") - if sep: - fr = _valid_v(fr) - to = _valid_v(to) - if fr > to: - raise ValueError(f'Given range "{tok}" must be lo-hi.') - enabled += comp_arr[fr : to + 1] - elif tok.startswith("^"): - t = _valid_v(tok[1:]) - enabled += comp_arr[:t] + comp_arr[t + 1 :] - else: - t = _valid_v(tok) - enabled.append(comp_arr[t]) - return list(set(enabled)) - - while True: - ans = prompt("", end="").lower().strip() - try: - enabled = _parse(ans) - except ValueError as e: - warn(f"invalid input. {e} Please try again.") - continue - - if enabled is not None: - manifest.resolve_components(enable=enabled) - return - - def deploy_configs(self, source: DotsSource, manifest: Manifest) -> None: + def deploy_configs(self, source: DotsSource, manifest: Manifest) -> dict[str, str]: print() log("Installing configs...") deployer = Deployer() for entry in manifest.enabled_entries(): - src = source.working_path(expand(entry.src)) + src = source.working_path(entry.expanded_src()) if not src.exists(): warn(f"missing in source, skipping: {entry.src}") continue - dests = expand_dests(entry.dest) + dests = entry.expanded_dests() if not dests: warn(f"dest glob matched nothing, skipping: {entry.dest}") continue @@ -194,6 +142,8 @@ class Command: deployer.place(src, Path(dest)) info(f"{entry.src} -> {dest}") + return deployer.deployed_files + def install_packages(self, source: DotsSource, manifest: Manifest) -> tuple[str, list[str], dict[str, list[str]]]: installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm) @@ -208,31 +158,10 @@ class Command: if local_dirs: print() log("Building local packages...") - for path in local_dirs: - directory = source.working_path(path) - if not directory.is_dir(): - warn(f"missing in repo, skipping: {path}") - continue - - log(f"Building {path}...") - local_packages[path] = installer.build_install(directory) + local_packages = build_local_packages(installer, source, local_dirs) return getattr(installer, "helper", DEFAULT_AUR_HELPER), packages, local_packages - def run_hooks(self, manifest: Manifest) -> None: - hooks = manifest.enabled_hooks("post_install") - if not hooks: - return - - print() - log("Running post-install hooks...") - env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)} - for hook in hooks: - info(f"Running hook: {hook}") - result = subprocess.run(hook, shell=True, env=env) - if result.returncode != 0: - warn(f"hook exited with {result.returncode}") - def print_done(self) -> None: print() info("All done! Caelestia has been installed.") diff --git a/src/caelestia/subcommands/update.py b/src/caelestia/subcommands/update.py new file mode 100644 index 0000000..294e823 --- /dev/null +++ b/src/caelestia/subcommands/update.py @@ -0,0 +1,227 @@ +import sys +from argparse import Namespace +from pathlib import Path + +from caelestia.utils.dots.deployer import Deployer +from caelestia.utils.dots.diff import Changeset +from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError +from caelestia.utils.dots.misc import build_local_packages, run_hooks +from caelestia.utils.dots.packages import PackageInstaller +from caelestia.utils.dots.source import DotsSource, SourceError +from caelestia.utils.dots.state import DotsState +from caelestia.utils.io import disable_input, fatal, info, log, prompt_selection, warn + + +class Command: + args: Namespace + + def __init__(self, args: Namespace) -> None: + self.args = args + + def run(self) -> None: + if self.args.noconfirm: + disable_input() + + state = DotsState.load() + if state.applied_rev is None: + fatal("dots not installed yet. Run `caelestia install` first.") + + # Run system update + installer = PackageInstaller.get(self.args.aur_helper or state.aur_helper, self.args.noconfirm) + installer.system_update() + + # Get manifest or exit if up to date + source, tip, manifest = self.fetch_manifest(state, state.applied_rev) + + # Apply file changes + entries = manifest.enabled_entries() + try: + changeset = Changeset.compute(source, state.applied_rev, tip, entries, state.deployed_files) + source.checkout_tip() + except SourceError as e: + fatal(e) + new_files, revived_files, placed = self.deploy_changeset(source, changeset) + + # Persist file changes immediately so a later failure can't lose track of them + deployed = dict(state.deployed_files) + for dest in (*changeset.deletes, *changeset.stale, *changeset.untracked): + deployed.pop(str(dest), None) + for repofile, dest in changeset.remap: + deployed[str(dest)] = repofile + deployed.update(placed) + state.deployed_files = deployed + state.save() + + # Install new/remove old packages + desired = manifest.enabled_packages() + state.packages = self.sync_packages(installer, state.packages, desired) + state.save() + + # Install new/remove old local PKGBUILD packages + desired_local = manifest.enabled_local_packages() + state.local_packages = self.sync_local_packages(installer, source, state.local_packages, desired_local) + state.save() + + # Run hooks + run_hooks(manifest, "post_update") + + # Mark the new revision applied + state.applied_rev = tip + state.enabled_components = manifest.enabled_components + state.aur_helper = getattr(installer, "helper", state.aur_helper) + state.save() + + self.summarize(changeset, new_files, revived_files) + + def fetch_manifest(self, state: DotsState, applied_rev: str) -> tuple[DotsSource, str, Manifest]: + print() + log("Fetching dots repo...") + source = DotsSource() + try: + source.ensure() + tip = source.tip_rev() + if tip == applied_rev: + info("Dots already up to date.") + sys.exit(0) + + manifest = source.manifest_at(tip) + if source.has_rev(applied_rev): + known = set(source.manifest_at(applied_rev).components) + else: + # Treat all components as known if rev is invalid so we don't overwrite existing prefs + known = set(manifest.components) + except (SourceError, ManifestError) as e: + fatal(e) + + # Enable components recorded at install time + any new components that are default on + enabled = [ + name + for name, comp in manifest.components.items() + if name in state.enabled_components or (name not in known and comp.default) + ] + + # Let the user opt into any new optional components + new_comps = [name for name, comp in manifest.components.items() if name not in known and not comp.default] + if new_comps: + info(f"New components: {', '.join(new_comps)}") + enabled += prompt_selection(new_comps, "Components to enable?") + + disabled = [name for name in manifest.components if name not in enabled] + try: + manifest.resolve_components(enable=enabled, disable=disabled) + except ComponentError as e: + fatal(e) + + info(f"Enabled components: {', '.join(enabled) or 'none'}") + + return source, tip, manifest + + def deploy_changeset( + self, source: DotsSource, changeset: Changeset + ) -> tuple[list[Path], list[Path], dict[str, str]]: + print() + + if changeset.is_empty(): + info("No configs to update.") + return [], [], {} + + log("Updating configs...") + deployer = Deployer() + + for repofile, dest in changeset.place: + src = source.working_path(repofile) + if not src.exists(): + warn(f"missing in source, skipping: {repofile}") + continue + deployer.place_file(src, dest) + info(f"{repofile} -> {dest}") + + new_files = [] + for repofile, dest in changeset.conflicts: + src = source.working_path(repofile) + if not src.exists(): + warn(f"missing in source, skipping: {repofile}") + continue + new_path = deployer.write_new(src, dest) + new_files.append(new_path) + warn(f"{dest} has local changes; upstream version written as {new_path.name}") + + revived_files = [] + for repofile, dest in changeset.deleted_changed: + src = source.working_path(repofile) + if not src.exists(): + warn(f"missing in source, skipping: {repofile}") + continue + new_path = deployer.write_new(src, dest) + revived_files.append(new_path) + warn(f"{dest} was removed but changed upstream; upstream version written as {new_path.name}") + + for dest in changeset.deletes: + deployer.remove(dest) + deployer.prune_empty_dirs(dest, Path.home()) + info(f"Removed {dest}") + + return new_files, revived_files, deployer.deployed_files + + def sync_packages(self, installer: PackageInstaller, current: list[str], desired: list[str]) -> list[str]: + to_install = [p for p in desired if p not in current] + to_remove = [p for p in current if p not in desired] + installed = list(current) + + if to_install: + print() + info(f"Installing new packages: {', '.join(to_install)}") + installer.install(to_install) + installed.extend(p for p in to_install if p not in installed) + + if to_remove: + print() + info(f"Packages no longer required: {', '.join(to_remove)}") + selected = prompt_selection(to_remove, "Packages to remove?") + if selected: + installer.remove(selected) + installed = [p for p in installed if p not in selected] + + return installed + + def sync_local_packages( + self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str] + ) -> dict[str, list[str]]: + to_build = [p for p in desired if p not in current] + to_remove = [p for p in current if p not in desired] + installed = dict(current) + + if to_build: + print() + log(f"Building new local packages: {', '.join(to_build)}") + installed.update(build_local_packages(installer, source, to_build)) + + if to_remove: + print() + info(f"Local packages no longer required: {', '.join(to_remove)}") + selected = prompt_selection(to_remove, "Local packages to remove?") + if selected: + installer.remove([pkg for path in selected for pkg in current[path]]) + for path in selected: + installed.pop(path, None) + + return installed + + def summarize(self, changeset: Changeset, new_files: list[Path], revived_files: list[Path]) -> None: + print() + conflicts = len(new_files) + len(revived_files) + info(f"Updated {len(changeset.place)} file(s), removed {len(changeset.deletes)}, {conflicts} conflict(s).") + if new_files: + info("The following files were changed upstream but you had edited them locally.") + info("Your versions were kept; the upstream versions were written alongside as .new:") + for path in new_files: + info(f" {path}") + if revived_files: + info("These files were removed by you but changed upstream, so were not restored.") + info("The upstream versions were written alongside as .new:") + for path in revived_files: + info(f" {path}") + if changeset.stale: + info("These files are no longer managed but differ from what was installed, so were kept:") + for path in changeset.stale: + info(f" {path}") diff --git a/src/caelestia/utils/dots/deployer.py b/src/caelestia/utils/dots/deployer.py index e8f766b..9ec3bb4 100644 --- a/src/caelestia/utils/dots/deployer.py +++ b/src/caelestia/utils/dots/deployer.py @@ -2,10 +2,18 @@ import shutil import tempfile from pathlib import Path +from caelestia.utils.paths import cache_dir, config_dir, data_dir, dots_dir, state_dir + +# Dirs to never prune even if empty +_PROTECTED_DIRS = frozenset({Path.home(), config_dir, data_dir, state_dir, cache_dir}) + class Deployer: """Places files from the dots clone into their destinations.""" + def __init__(self): + self.deployed_files: dict[str, str] = {} + def place(self, src: Path, dest: Path) -> None: """Place a whole entry (file or directory tree), replacing any existing dest.""" @@ -27,7 +35,7 @@ class Deployer: elif path.is_dir(): (dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True) - def place_file(self, src: Path, dest: Path) -> None: + def place_file(self, src: Path, dest: Path, record: bool = True) -> None: """Atomically place a single file, replacing any existing dest.""" if dest.is_dir() and not dest.is_symlink(): @@ -44,11 +52,15 @@ class Deployer: Path(f.name).unlink() raise + if record: + # Keep relative to dots dir + self.deployed_files[str(dest)] = str(src.relative_to(dots_dir)) + def write_new(self, src: Path, dest: Path) -> Path: """Write the upstream version alongside dest as .new and return that path.""" new_path = dest.parent / f"{dest.name}.new" - self.place_file(src, new_path) + self.place_file(src, new_path, record=False) return new_path def remove(self, path: Path) -> None: @@ -56,3 +68,17 @@ class Deployer: path.unlink() elif path.is_dir(): shutil.rmtree(path) + + def prune_empty_dirs(self, start: Path, stop: Path) -> None: + """Removes dirs recursively from start to stop. + + Will never prune protected dirs (home, config, cache, etc). + """ + + parent = start.parent + while parent != stop and stop in parent.parents and parent not in _PROTECTED_DIRS: + try: + parent.rmdir() + except OSError: + break + parent = parent.parent diff --git a/src/caelestia/utils/dots/diff.py b/src/caelestia/utils/dots/diff.py new file mode 100644 index 0000000..ae4e576 --- /dev/null +++ b/src/caelestia/utils/dots/diff.py @@ -0,0 +1,153 @@ +from dataclasses import dataclass, field +from pathlib import Path + +from caelestia.utils.dots.manifest import ManifestEntry +from caelestia.utils.dots.source import DotsSource, SourceError +from caelestia.utils.io import warn + + +class _Continue(Exception): + """Signals the deployed-files loop to skip to the next entry.""" + + +def _read_local(path: Path) -> bytes | None: + """Read a local file, returning None if it can't be read (perms, is a dir, etc.).""" + + try: + return path.read_bytes() + except OSError: + return None + + +@dataclass(frozen=True) +class Changeset: + place: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) to fast-forward + conflicts: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) -> write .new + deletes: list[Path] = field(default_factory=list) # We placed it, upstream removed it, unmodified + stale: list[Path] = field(default_factory=list) # Upstream removed it but user modified it + deleted_changed: list[tuple[str, Path]] = field(default_factory=list) # User deleted it, upstream changed -> .new + untracked: list[Path] = field(default_factory=list) # Gone + no longer managed; drop from state + remap: list[tuple[str, Path]] = field(default_factory=list) # Up to date but source path moved; restate mapping + + def is_empty(self) -> bool: + return not (self.place or self.conflicts or self.deletes or self.stale or self.deleted_changed) + + @staticmethod + def compute( + source: DotsSource, + applied_rev: str, + tip: str, + entries: list[ManifestEntry], + deployed: dict[str, str], + ) -> "Changeset": + """Collect all file changes needed into a Changeset.""" + + has_base = source.has_rev(applied_rev) + if not has_base: + warn( + "the previously applied revision is missing from the dots clone; files that differ " + "from the latest version will be written as .new instead of updated in place." + ) + + changed = set(source.changed_files(applied_rev, tip)) if has_base else set() + place: list[tuple[str, Path]] = [] + conflicts: list[tuple[str, Path]] = [] + deletes: list[Path] = [] + stale: list[Path] = [] + deleted_changed: list[tuple[str, Path]] = [] + untracked: list[Path] = [] + remap: list[tuple[str, Path]] = [] + + # Collect all files to deploy (entry sources can be dirs so we recurse into them) + to_deploy: dict[Path, str] = {} + for entry in entries: + src_root = str(entry.expanded_src()) + repo_files = source.files_at(tip, src_root) + for dest in entry.expanded_dests(): + for repo_file in repo_files: + to_deploy[dest / Path(repo_file).relative_to(src_root)] = repo_file + files_to_deploy = set(to_deploy) + + # Already deployed files + for dest, src in deployed.items(): + dest_path = Path(dest) + + def try_read(rev: str, path: str) -> bytes: + try: + return source.blob_at(rev, path) + except SourceError: + # Read failed, keep it just in case + stale.append(dest_path) + raise _Continue + + try: + if dest_path not in files_to_deploy: # No longer managed by any entry + if not dest_path.exists(): + # Gone from disk and no entry manages it + untracked.append(dest_path) + continue + + local = _read_local(dest_path) + if local is not None and has_base and try_read(applied_rev, src) == local: + deletes.append(dest_path) + else: + # Modified, or unreadable so we can't verify; keep it just in case + stale.append(dest_path) + else: # Still managed; `src` is what we last placed, `new_src` the current source + new_src = to_deploy[dest_path] + if not dest_path.exists(): + # User deleted a managed file locally + if has_base and new_src == src and new_src not in changed: + continue # Respect the deletion; upstream has nothing new to offer + # Upstream changed it (or base is unknown): surface as .new, don't restore + deleted_changed.append((new_src, dest_path)) + continue + + if has_base and new_src == src and new_src not in changed: + continue # Unchanged upstream + + dest_content = _read_local(dest_path) + if dest_content is None: + # Unreadable (perms, became a dir, ...); surface upstream as .new, don't clobber + conflicts.append((new_src, dest_path)) + continue + + if try_read(tip, new_src) == dest_content: + # Already up to date; restate the mapping if the source path moved + if new_src != src: + remap.append((new_src, dest_path)) + continue + + # Fast-forward only when the user hasn't edited since last deploy + if has_base and try_read(applied_rev, src) == dest_content: + place.append((new_src, dest_path)) + else: + conflicts.append((new_src, dest_path)) + except _Continue: + continue + + # New files to deploy + for dest in files_to_deploy - set(Path(d) for d in deployed): + src = to_deploy[dest] + try: + new_content = source.blob_at(tip, src) + except SourceError: + # Failed to read the upstream blob; skip rather than abort the whole update + warn(f"could not read from source, skipping: {src}") + continue + if not dest.exists() or new_content == _read_local(dest): + # Dest nonexistent or already equal to new content + place.append((src, dest)) + else: + # Differs, or exists but unreadable; surface upstream as .new + conflicts.append((src, dest)) + + return Changeset( + place=place, + conflicts=conflicts, + deletes=deletes, + stale=stale, + deleted_changed=deleted_changed, + untracked=untracked, + remap=remap, + ) diff --git a/src/caelestia/utils/dots/manifest.py b/src/caelestia/utils/dots/manifest.py index f4acfb5..297f3f6 100644 --- a/src/caelestia/utils/dots/manifest.py +++ b/src/caelestia/utils/dots/manifest.py @@ -7,6 +7,8 @@ from pathlib import Path from string import Template from typing import Any +from caelestia.utils.io import warn + _XDG_DEFAULTS = { "XDG_CONFIG_HOME": str(Path.home() / ".config"), "XDG_DATA_HOME": str(Path.home() / ".local/share"), @@ -25,36 +27,38 @@ class ComponentError(Exception): """Raised when component flags are invalid or contradictory.""" -def expand(text: str) -> Path: +def _expand(text: str) -> Path: """Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path.""" env = {**_XDG_DEFAULTS, **os.environ} return Path(Template(text).safe_substitute(env)).expanduser() -def expand_dests(dest: str) -> list[Path]: - """Expand globs within a dest path. - - Globs from the start until the segment with the last glob so subdirs are - created if they didn't exist previously. - """ - - expanded = expand(dest) - if not _GLOB_MAGIC.search(str(expanded)): - return [expanded] - - parts = expanded.parts - glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part)) - pattern = str(Path(*parts[: glob_idx + 1])) - tail = parts[glob_idx + 1 :] - return [Path(match, *tail) for match in sorted(glob.glob(pattern))] - - @dataclass(frozen=True) class ManifestEntry: src: str dest: str + def expanded_src(self) -> Path: + return _expand(self.src) + + def expanded_dests(self) -> list[Path]: + """The dest path with globs expanded. + + Globs from the start until the segment with the last glob so subdirs are + created if they didn't exist previously. + """ + + expanded = _expand(self.dest) + if not _GLOB_MAGIC.search(str(expanded)): + return [expanded] + + parts = expanded.parts + glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part)) + pattern = str(Path(*parts[: glob_idx + 1])) + tail = parts[glob_idx + 1 :] + return [Path(match, *tail) for match in sorted(glob.glob(pattern))] + @dataclass(frozen=True) class ManifestComponent: @@ -103,6 +107,8 @@ class Manifest: components = {} for comp in raw.get("components", []): parsed = _parse_component(comp) + if parsed.name in components: + warn(f"duplicate component '{parsed.name}'; using the last definition") components[parsed.name] = parsed return Manifest( diff --git a/src/caelestia/utils/dots/misc.py b/src/caelestia/utils/dots/misc.py new file mode 100644 index 0000000..c3c15a8 --- /dev/null +++ b/src/caelestia/utils/dots/misc.py @@ -0,0 +1,39 @@ +import os +import subprocess + +from caelestia.utils.dots.manifest import Manifest +from caelestia.utils.dots.packages import PackageInstaller +from caelestia.utils.dots.source import DotsSource +from caelestia.utils.io import info, log, warn +from caelestia.utils.paths import dots_dir + + +def build_local_packages(installer: PackageInstaller, source: DotsSource, paths: list[str]) -> dict[str, list[str]]: + """Build and install each local PKGBUILD dir, returning {path: installed package names}.""" + + built: dict[str, list[str]] = {} + for path in paths: + directory = source.working_path(path) + if not directory.is_dir(): + warn(f"missing in repo, skipping: {path}") + continue + log(f"Building {path}...") + built[path] = installer.build_install(directory) + return built + + +def run_hooks(manifest: Manifest, kind: str) -> None: + """Run the global + enabled components' hooks of the given kind (e.g. "post_install").""" + + hooks = manifest.enabled_hooks(kind) + if not hooks: + return + + print() + log(f"Running {kind.replace('_', '-')} hooks...") + env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)} + for hook in hooks: + info(f"Running hook: {hook}") + result = subprocess.run(hook, shell=True, env=env) + if result.returncode != 0: + warn(f"hook exited with {result.returncode}") diff --git a/src/caelestia/utils/dots/packages.py b/src/caelestia/utils/dots/packages.py index d967c42..0b66ec7 100644 --- a/src/caelestia/utils/dots/packages.py +++ b/src/caelestia/utils/dots/packages.py @@ -73,6 +73,9 @@ class PackageInstaller(ABC): def build_install(self, directory: Path) -> list[str]: """Build and install the PKGBUILD in `directory`, returning the installed package names.""" + @abstractmethod + def system_update(self) -> None: ... + class NoopInstaller(PackageInstaller): """Used off Arch, where the dots' packages are not available via pacman/AUR.""" @@ -89,6 +92,9 @@ class NoopInstaller(PackageInstaller): info(f"Skipping local package build (not on Arch): {directory}") return [] + def system_update(self) -> None: + info("Skipping system update (not on Arch)") + class ArchInstaller(PackageInstaller): def __init__(self, helper: str, noconfirm: bool = False) -> None: @@ -128,3 +134,6 @@ class ArchInstaller(PackageInstaller): subprocess.run(["makepkg", "-fsi", *self.flags], cwd=directory, env=env, check=True) return names + + def system_update(self) -> None: + subprocess.run([self.helper, "-Syu", *self.flags], check=True) diff --git a/src/caelestia/utils/dots/source.py b/src/caelestia/utils/dots/source.py index d4f79fd..d2c7159 100644 --- a/src/caelestia/utils/dots/source.py +++ b/src/caelestia/utils/dots/source.py @@ -17,6 +17,8 @@ class DotsSource: cfg = get_config().get("dots", {}) self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git") self.branch = cfg.get("branch", "main") + # Cache git blobs by (ref, relpath); objects are immutable for a given rev + self._blob_cache: dict[tuple[str, str], bytes] = {} @property def remote_ref(self) -> str: @@ -68,6 +70,15 @@ class DotsSource: out = self._git("diff", "--name-only", base, head) return [line for line in out.splitlines() if line] + def has_rev(self, rev: str) -> bool: + """Whether `rev` resolves to a commit.""" + + try: + self._git("rev-parse", "--verify", "--quiet", f"{rev}^{{commit}}") + return True + except SourceError: + return False + def clean(self) -> None: """Remove all untracked files in the git repo.""" self._git("clean", "-fdx") @@ -81,7 +92,10 @@ class DotsSource: return self._git("show", f"{ref}:{relpath}") def blob_at(self, ref: str, relpath: str) -> bytes: - return self._git_bytes("show", f"{ref}:{relpath}") + key = (ref, relpath) + if key not in self._blob_cache: + self._blob_cache[key] = self._git_bytes("show", f"{ref}:{relpath}") + return self._blob_cache[key] def files_at(self, ref: str, relpath: str) -> list[str]: """Repo-relative paths of all files under relpath at ref (the path itself if it is a file).""" @@ -92,10 +106,12 @@ class DotsSource: # --- Helpers --- def _git(self, *args: str) -> str: - return self._run("git", "-C", str(dots_dir), *args) + # core.quotePath=false so non-ASCII paths come back verbatim, not octal-escaped + return self._run("git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args) def _git_bytes(self, *args: str) -> bytes: - result = subprocess.run(["git", "-C", str(dots_dir), *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = ["git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args] + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed") return result.stdout diff --git a/src/caelestia/utils/dots/state.py b/src/caelestia/utils/dots/state.py index d47f3c9..8a9aabc 100644 --- a/src/caelestia/utils/dots/state.py +++ b/src/caelestia/utils/dots/state.py @@ -21,6 +21,10 @@ class DotsState: packages: list[str] = field(default_factory=list) local_packages: dict[str, list[str]] = field(default_factory=dict) + # Files placed by the last deploy. Only files, not directories + # Maps dest -> src + deployed_files: dict[str, str] = field(default_factory=dict) + @staticmethod def load() -> "DotsState": try: @@ -37,6 +41,7 @@ class DotsState: enabled_components=data.get("enabled_components", []), packages=data.get("packages", []), local_packages=data.get("local_packages", {}), + deployed_files=data.get("deployed_files", {}), ) def save(self) -> None: @@ -48,5 +53,6 @@ class DotsState: "enabled_components": self.enabled_components, "packages": self.packages, "local_packages": self.local_packages, + "deployed_files": self.deployed_files, }, ) diff --git a/src/caelestia/utils/io.py b/src/caelestia/utils/io.py index 020ca45..a4a8f25 100644 --- a/src/caelestia/utils/io.py +++ b/src/caelestia/utils/io.py @@ -80,6 +80,57 @@ def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool: return answer in ("y", "yes") +def prompt_selection(items: list[str], header: str) -> list[str]: + """Prompt the user to pick from a numbered list, returning the selected items. + + Accepts `[A]ll`/`a`, single indices, ranges (`1-3`) and exclusions (`^4`). + Empty input selects nothing. Re-prompts until the input parses. + """ + + print(format_msg(PROMPT_COLOUR, True, header)) + max_idx_w = len(str(len(items))) + for i, item in enumerate(items): + print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{item}")) + print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)")) + + def valid_idx(v: str) -> int: + try: + idx = int(v, base=10) - 1 # -1 to translate to 0 index + except ValueError: + raise ValueError(f'Given value "{v}" must be an integer.') + if idx < 0 or idx >= len(items): + raise ValueError(f'Given value "{v}" must be between 1 and {len(items)} inclusive.') + return idx + + def parse(ans: str) -> list[str]: + if ans in ("a", "all"): + return list(items) + if not ans: + return [] + + selected: list[str] = [] + for tok in ans.split(): + fr, sep, to = tok.partition("-") + if sep: + lo, hi = valid_idx(fr), valid_idx(to) + if lo > hi: + raise ValueError(f'Given range "{tok}" must be lo-hi.') + selected += items[lo : hi + 1] + elif tok.startswith("^"): + t = valid_idx(tok[1:]) + selected += items[:t] + items[t + 1 :] + else: + selected.append(items[valid_idx(tok)]) + return list(set(selected)) + + while True: + ans = prompt("", end="").lower().strip() + try: + return parse(ans) + except ValueError as e: + warn(f"invalid input. {e} Please try again.") + + def pause() -> None: if _disable_input: return