diff --git a/src/caelestia/subcommands/install.py b/src/caelestia/subcommands/install.py new file mode 100644 index 00000000..ff53d571 --- /dev/null +++ b/src/caelestia/subcommands/install.py @@ -0,0 +1,171 @@ +import os +import shutil +import subprocess +import textwrap +from argparse import Namespace +from pathlib import Path + +from caelestia.utils.dots.deployer import Deployer +from caelestia.utils.dots.manifest import Manifest, ManifestError, expand, expand_dests +from caelestia.utils.dots.packages import PackageInstaller +from caelestia.utils.dots.source import DotsSource, SourceError +from caelestia.utils.dots.state import DotsState +from caelestia.utils.io import confirm, disable_input, fatal, info, log, pause, warn +from caelestia.utils.paths import ( + config_backup_dir, + config_dir, + dots_dir, +) + + +def _parse_list_arg(value: str | None) -> list[str] | None: + if value is None: + return None + return [item.strip() for item in value.split(",") if item.strip()] + + +class Command: + args: Namespace + + def __init__(self, args: Namespace) -> None: + self.args = args + + def run(self) -> None: + if self.args.noconfirm: + disable_input() + + self.print_greeting() + self.create_backup() + + source, tip, manifest = self.fetch_manifest() + self.deploy_configs(source, manifest) + helper, packages, local_packages = self.install_packages(source, manifest) + self.run_hooks(manifest) + + DotsState( + aur_helper=helper, + applied_rev=tip, + enabled_components=manifest.enabled_components, + packages=packages, + local_packages=local_packages, + ).save() + + info("Done!") + + def print_greeting(self) -> None: + print( + "\033[38;2;150;241;241m" # Caelestia colour + + textwrap.dedent( + r""" + ╭─────────────────────────────────────────────────╮ + │ ______ __ __ _ │ + │ / ____/___ ____ / /__ _____/ /_(_)___ _ │ + │ / / / __ `/ _ \/ / _ \/ ___/ __/ / __ `/ │ + │ / /___/ /_/ / __/ / __(__ ) /_/ / /_/ / │ + │ \____/\__,_/\___/_/\___/____/\__/_/\__,_/ │ + │ │ + ╰─────────────────────────────────────────────────╯ + """ + ) + + "\033[0m" + ) + info("Welcome to the Caelestia dotfiles installer!") + info("Here's a quick overview on what this command is going to do:") + info(" - Install dependencies") + info(" - Install config files") + info("The installer does NOT set up hardware/system level configs (e.g. drivers). Please do this yourself.") + pause() + + def create_backup(self) -> None: + if config_dir.exists(): + if not confirm("Back up the config directory?", default=True): + return + + log(f"Creating a backup of {config_dir}...") + if config_backup_dir.exists(): + if not confirm("A backup already exists, overwrite?", default=False): + info("Not creating backup.") + return + + log("Deleting old backup...") + shutil.rmtree(config_backup_dir) + + shutil.copytree(config_dir, config_backup_dir, symlinks=True) + info(f"Created backup at {config_backup_dir}") + + def fetch_manifest(self) -> tuple[DotsSource, str, Manifest]: + log("Fetching dots repo...") + source = DotsSource() + try: + source.ensure() + tip = source.checkout_tip() + except SourceError as e: + fatal(e) + + try: + manifest = source.manifest_at(tip) + manifest.resolve_components( + enable=_parse_list_arg(self.args.enable_components), + disable=_parse_list_arg(self.args.disable_components), + ) + except ManifestError as e: + fatal(e) + + names = ", ".join(manifest.enabled_components) or "none" + info(f"Enabled components: {names}") + + return source, tip, manifest + + def deploy_configs(self, source: DotsSource, manifest: Manifest) -> None: + log("Installing configs...") + deployer = Deployer() + for entry in manifest.enabled_entries(): + src = source.working_path(expand(entry.src)) + if not src.exists(): + warn(f"missing in source, skipping: {entry.src}") + continue + + dests = expand_dests(entry.dest) + if not dests: + warn(f"dest glob matched nothing, skipping: {entry.dest}") + continue + + for dest in dests: + deployer.place(src, Path(dest)) + info(f"{entry.src} -> {dest}") + + def install_packages(self, source: DotsSource, manifest: Manifest) -> tuple[str, list[str], dict[str, list[str]]]: + installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm) + + packages = manifest.enabled_packages() + if packages: + log("Installing packages...") + installer.install(packages) + + local_packages = {} + local_dirs = manifest.enabled_local_packages() + if local_dirs: + log("Building local packages...") + for path in local_dirs: + directory = source.working_path(path) + if not directory.is_dir(): + warn(f"missing in repo, skipping: {path}") + continue + + log(f"Building {path}...") + local_packages[path] = installer.build_install(directory) + + return getattr(installer, "helper", ""), packages, local_packages + + def run_hooks(self, manifest: Manifest) -> None: + hooks = manifest.enabled_hooks("post_install") + if not hooks: + return + + log("Running post-install hooks...") + env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)} + for hook in hooks: + log(f"Running hook: {hook}") + result = subprocess.run(hook, shell=True, env=env) + if result.returncode != 0: + warn(f"hook exited with {result.returncode}") diff --git a/src/caelestia/utils/dots/deployer.py b/src/caelestia/utils/dots/deployer.py new file mode 100644 index 00000000..586bc046 --- /dev/null +++ b/src/caelestia/utils/dots/deployer.py @@ -0,0 +1,58 @@ +import shutil +import tempfile +from pathlib import Path + + +class Deployer: + """Places files from the dots clone into their destinations.""" + + def place(self, src: Path, dest: Path) -> None: + """Place a whole entry (file or directory tree), replacing any existing dest.""" + + if src.is_dir(): + self.place_dir(src, dest) + else: + self.place_file(src, dest) + + def place_dir(self, src: Path, dest: Path) -> None: + """Place a directory tree recursively, replacing any existing dest.""" + + if dest.is_symlink() or dest.is_file(): + self.remove(dest) + + dest.mkdir(parents=True, exist_ok=True) + for path in src.rglob("*"): + if path.is_file(): + self.place_file(path, dest / path.relative_to(src)) + elif path.is_dir(): + (dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True) + + def place_file(self, src: Path, dest: Path) -> None: + """Atomically place a single file, replacing any existing dest.""" + + if dest.is_dir() and not dest.is_symlink(): + self.remove(dest) + + dest.parent.mkdir(parents=True, exist_ok=True) + f = tempfile.NamedTemporaryFile(dir=dest.parent, delete=False) + f.close() + try: + shutil.copyfile(src, f.name) + shutil.copymode(src, f.name) + Path(f.name).replace(dest) + except BaseException: + Path(f.name).unlink() + raise + + def write_new(self, src: Path, dest: Path) -> Path: + """Write the upstream version alongside dest as .new and return that path.""" + + new_path = dest.parent / f"{dest.name}.new" + self.place_file(src, new_path) + return new_path + + def remove(self, path: Path) -> None: + if path.is_symlink() or path.is_file(): + path.unlink() + elif path.is_dir(): + shutil.rmtree(path) diff --git a/src/caelestia/utils/dots/manifest.py b/src/caelestia/utils/dots/manifest.py new file mode 100644 index 00000000..4e02c73f --- /dev/null +++ b/src/caelestia/utils/dots/manifest.py @@ -0,0 +1,205 @@ +import glob +import os +import re +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from string import Template +from typing import Any + +_XDG_DEFAULTS = { + "XDG_CONFIG_HOME": str(Path.home() / ".config"), + "XDG_DATA_HOME": str(Path.home() / ".local/share"), + "XDG_STATE_HOME": str(Path.home() / ".local/state"), + "XDG_CACHE_HOME": str(Path.home() / ".cache"), +} +_GLOB_MAGIC = re.compile(r"[*?[]") +_LOCAL_PREFIX = "local:" + + +class ManifestError(Exception): + """Raised when manifest.toml is malformed.""" + + +class ComponentError(Exception): + """Raised when component flags are invalid or contradictory.""" + + +def expand(text: str) -> Path: + """Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path.""" + + env = {**_XDG_DEFAULTS, **os.environ} + return Path(Template(text).safe_substitute(env)).expanduser() + + +def expand_dests(dest: str) -> list[Path]: + """Expand globs within a dest path. + + Globs from the start until the segment with the last glob so subdirs are + created if they didn't exist previously. + """ + + expanded = expand(dest) + if not _GLOB_MAGIC.search(str(expanded)): + return [expanded] + + parts = expanded.parts + glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part)) + pattern = str(Path(*parts[: glob_idx + 1])) + tail = parts[glob_idx + 1 :] + return [Path(match, *tail) for match in sorted(glob.glob(pattern))] + + +@dataclass(frozen=True) +class ManifestEntry: + src: str + dest: str + + +@dataclass(frozen=True) +class ManifestComponent: + name: str + default: bool = False + packages: list[str] = field(default_factory=list) + entries: list[ManifestEntry] = field(default_factory=list) + post_install: list[str] = field(default_factory=list) + post_update: list[str] = field(default_factory=list) + + +@dataclass +class _ManifestData: + enabled_comps: list[str] = field(default_factory=list) + disabled_comps: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class Manifest: + components: dict[str, ManifestComponent] = field(default_factory=dict) + packages: list[str] = field(default_factory=list) + post_install: list[str] = field(default_factory=list) + post_update: list[str] = field(default_factory=list) + _data: _ManifestData = field(default_factory=_ManifestData, init=False, repr=False) + + @property + def enabled_components(self) -> list[str]: + return self._data.enabled_comps + + @property + def disabled_components(self) -> list[str]: + return self._data.disabled_comps + + @staticmethod + def parse(text: str) -> "Manifest": + try: + raw = tomllib.loads(text) + except tomllib.TOMLDecodeError as e: + raise ManifestError(f"invalid TOML: {e}") from e + + hooks = raw.get("hooks", {}) + post_install = _validate_str_list(hooks.get("post_install", []), "hooks.post_install") + post_update = _validate_str_list(hooks.get("post_update", []), "hooks.post_update") + + packages = _validate_str_list(raw.get("packages", []), "packages") + + components = {} + for comp in raw.get("components", []): + parsed = _parse_component(comp) + components[parsed.name] = parsed + + return Manifest( + components=components, + packages=packages, + post_install=post_install, + post_update=post_update, + ) + + def resolve_components( + self, + enable: list[str] | None = None, + disable: list[str] | None = None, + ) -> None: + """Resolves enabled/disabled components. This MUST be called before calling any other method.""" + + enable_set = set(enable or []) + disable_set = set(disable or []) + known = set(self.components) + + for name in enable_set | disable_set: + if name not in known: + raise ManifestError(f"unknown component: {name}") + + conflict = enable_set & disable_set + if conflict: + raise ManifestError(f"component(s) both enabled and disabled: {', '.join(sorted(conflict))}") + + enabled = {name for name, comp in self.components.items() if comp.default} + enabled |= enable_set + enabled -= disable_set + + for name in self.components: + if name in enabled: + self._data.enabled_comps.append(name) + else: + self._data.disabled_comps.append(name) + + def enabled_entries(self) -> list[ManifestEntry]: + """The entries of every enabled component.""" + + entries: list[ManifestEntry] = [] + for name in self._data.enabled_comps: + entries.extend(self.components[name].entries) + return entries + + def enabled_hooks(self, kind: str) -> list[str]: + """Global + enabled components' hooks of the given kind.""" + + hooks = list(getattr(self, kind)) + for name in self._data.enabled_comps: + hooks.extend(getattr(self.components[name], kind)) + return hooks + + def enabled_packages(self) -> list[str]: + """Repo/AUR packages to install.""" + return [p for p in self._all_packages() if not p.startswith(_LOCAL_PREFIX)] + + def enabled_local_packages(self) -> list[str]: + """Local PKGBUILD dirs to build. + + Local packages are determined by a local: prefix and are + relative dirs instead of package names. + """ + return [p[len(_LOCAL_PREFIX) :] for p in self._all_packages() if p.startswith(_LOCAL_PREFIX)] + + def _all_packages(self) -> list[str]: + """The manifest's top-level packages plus enabled components'.""" + return list(set(self.packages) | set(p for c in self._data.enabled_comps for p in self.components[c].packages)) + + +def _require_key(d: dict[str, Any], key: str, ctx: str) -> Any: + if key not in d: + raise ManifestError(f"{ctx}: missing required key '{key}'") + return d[key] + + +def _validate_str_list(value: Any, ctx: str) -> list[str]: + if not isinstance(value, list) or not all(isinstance(v, str) for v in value): + raise ManifestError(f"{ctx}: expected a list of strings") + return value + + +def _parse_entry(d: Any) -> ManifestEntry: + if not isinstance(d, dict): + raise ManifestError("entry: expected a table") + return ManifestEntry(src=_require_key(d, "src", "entry"), dest=_require_key(d, "dest", "entry")) + + +def _parse_component(d: dict[str, Any]) -> ManifestComponent: + name = _require_key(d, "name", "component") + return ManifestComponent( + name=name, + default=bool(d.get("default", False)), + packages=_validate_str_list(d.get("packages", []), f"component '{name}' packages"), + entries=[_parse_entry(e) for e in d.get("entries", [])], + post_install=_validate_str_list(d.get("post_install", []), f"component '{name}' post_install"), + post_update=_validate_str_list(d.get("post_update", []), f"component '{name}' post_update"), + ) diff --git a/src/caelestia/utils/dots/packages.py b/src/caelestia/utils/dots/packages.py new file mode 100644 index 00000000..3d3e7cd6 --- /dev/null +++ b/src/caelestia/utils/dots/packages.py @@ -0,0 +1,129 @@ +import shutil +import subprocess +from abc import ABC, abstractmethod +from pathlib import Path + +from caelestia.utils.io import fatal, info + +AUR_HELPERS = "paru", "yay" + + +def _install_aur_helper(helper: str, noconfirm: bool = False) -> None: + pacman_cmd = ["sudo", "pacman", "-S", "--needed", "git", "base-devel"] + if noconfirm: + pacman_cmd.append("--noconfirm") + subprocess.run(pacman_cmd, check=True) + + repo_url = f"https://aur.archlinux.org/{helper}.git" + repo_dir = f"/tmp/{helper}" + subprocess.run(["git", "clone", repo_url, repo_dir], check=True) + + makepkg_cmd = ["makepkg", "-si"] + if noconfirm: + makepkg_cmd.append("--noconfirm") + subprocess.run(makepkg_cmd, cwd=repo_dir, check=True) + + try: + shutil.rmtree(repo_dir) + except FileNotFoundError: + pass + + if helper == "yay": + subprocess.run(["yay", "-Y", "--gendb"], check=True) + subprocess.run(["yay", "-Y", "--devel", "--save"], check=True) + else: + subprocess.run(["paru", "--gendb"], check=True) + + +class PackageInstaller(ABC): + @staticmethod + def get(helper: str | None = None, noconfirm: bool = False) -> "PackageInstaller": + """Pick a package installer: the requested/detected AUR helper on Arch, else a no-op.""" + + # Not on Arch, can't install packages + if shutil.which("pacman") is None: + return NoopInstaller() + + # Explicitly given + if helper: + if not shutil.which(helper): + if helper not in AUR_HELPERS: + fatal(f"given AUR helper {helper} is not installed and is unable to be installed automatically.") + + info(f"Given AUR helper not installed. Installing {helper}...") + _install_aur_helper(helper, noconfirm) + return ArchInstaller(helper, noconfirm) + + # Not given, find installed one + for candidate in AUR_HELPERS: + if shutil.which(candidate): + return ArchInstaller(candidate, noconfirm) + + info("No AUR helper found. Installing paru...") + _install_aur_helper("paru", noconfirm) + return ArchInstaller("paru", noconfirm) + + # --- Abstract methods --- + + @abstractmethod + def install(self, packages: list[str]) -> None: ... + + @abstractmethod + def remove(self, packages: list[str]) -> None: ... + + @abstractmethod + def build_install(self, directory: Path) -> list[str]: + """Build and install the PKGBUILD in `directory`, returning the installed package names.""" + + +class NoopInstaller(PackageInstaller): + """Used off Arch, where the dots' packages are not available via pacman/AUR.""" + + def install(self, packages: list[str]) -> None: + if packages: + info(f"Skipping package install (not on Arch): {', '.join(packages)}") + + def remove(self, packages: list[str]) -> None: + if packages: + info(f"Skipping package removal (not on Arch): {', '.join(packages)}") + + def build_install(self, directory: Path) -> list[str]: + info(f"Skipping local package build (not on Arch): {directory}") + return [] + + +class ArchInstaller(PackageInstaller): + def __init__(self, helper: str, noconfirm: bool = False) -> None: + self.helper = helper + self.flags = ["--noconfirm"] if noconfirm else [] + + def install(self, packages: list[str], extra_flags: list[str] = []) -> None: + if not packages: + return + subprocess.run([self.helper, "-S", "--needed", *self.flags, *extra_flags, *packages], check=True) + + def remove(self, packages: list[str]) -> None: + if not packages: + return + subprocess.run([self.helper, "-Rns", *self.flags, *packages], check=True) + + def build_install(self, directory: Path) -> list[str]: + srcinfo = subprocess.check_output(["makepkg", "--printsrcinfo"], cwd=directory, text=True) + names = [] + depends = [] + for line in srcinfo.splitlines(): + key, sep, value = line.partition("=") + if not sep: + continue + + key = key.strip() + if key == "pkgname": + names.append(value.strip()) + elif key == "depends": + depends.append(value.strip()) + + self.install(depends, extra_flags=["--asdeps"]) + # -f = force, -s = sync deps, -i = install + subprocess.run(["makepkg", "-fsi", *self.flags], cwd=directory, check=True) + + return names diff --git a/src/caelestia/utils/dots/source.py b/src/caelestia/utils/dots/source.py new file mode 100644 index 00000000..76f45f36 --- /dev/null +++ b/src/caelestia/utils/dots/source.py @@ -0,0 +1,89 @@ +import subprocess +from pathlib import Path + +from caelestia.utils.dots.manifest import Manifest +from caelestia.utils.paths import dots_dir, get_config + + +class SourceError(Exception): + """Raised when a git operation against the dots clone fails.""" + + +class DotsSource: + def __init__(self) -> None: + cfg = get_config().get("dots", {}) + self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git") + self.branch = cfg.get("branch", "main") + + @property + def remote_ref(self) -> str: + return f"origin/{self.branch}" + + def exists(self) -> bool: + return (dots_dir / ".git").is_dir() + + def working_path(self, relpath: str | Path) -> Path: + """Get a Path relative to the dots dir.""" + return dots_dir / relpath + + def ensure(self) -> None: + """Clone the repo if absent, otherwise fetch the latest refs.""" + + if self.exists(): + self._git("fetch", "--prune", "origin", self.branch) + else: + dots_dir.parent.mkdir(parents=True, exist_ok=True) + self._run("git", "clone", "--branch", self.branch, self.url, str(dots_dir)) + + def checkout_tip(self) -> str: + """Reset the working tree to the fetched tip and return its commit hash.""" + + self._git("reset", "--hard", self.remote_ref) + return self.tip_rev() + + def tip_rev(self) -> str: + return self._git("rev-parse", self.remote_ref).strip() + + def changed_files(self, base: str, head: str) -> list[str]: + """Repo-relative paths that differ between two revisions.""" + + out = self._git("diff", "--name-only", base, head) + return [line for line in out.splitlines() if line] + + def clean(self) -> None: + """Remove all untracked files in the git repo.""" + self._git("clean", "-fdx") + + # --- Accessors --- + + def manifest_at(self, ref: str) -> Manifest: + return Manifest.parse(self.text_at(ref, "manifest.toml")) + + def text_at(self, ref: str, relpath: str) -> str: + return self._git("show", f"{ref}:{relpath}") + + def blob_at(self, ref: str, relpath: str) -> bytes: + return self._git_bytes("show", f"{ref}:{relpath}") + + def files_at(self, ref: str, relpath: str) -> list[str]: + """Repo-relative paths of all files under relpath at ref (the path itself if it is a file).""" + + out = self._git("ls-tree", "-r", "--name-only", ref, "--", relpath) + return [line for line in out.splitlines() if line] + + # --- Helpers --- + + def _git(self, *args: str) -> str: + return self._run("git", "-C", str(dots_dir), *args) + + def _git_bytes(self, *args: str) -> bytes: + result = subprocess.run(["git", "-C", str(dots_dir), *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if result.returncode != 0: + raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed") + return result.stdout + + def _run(self, *cmd: str) -> str: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + raise SourceError(result.stderr.strip() or f"{' '.join(cmd)} failed") + return result.stdout diff --git a/src/caelestia/utils/dots/state.py b/src/caelestia/utils/dots/state.py new file mode 100644 index 00000000..9529bf86 --- /dev/null +++ b/src/caelestia/utils/dots/state.py @@ -0,0 +1,51 @@ +import json +from dataclasses import dataclass, field + +from caelestia.utils.io import warn +from caelestia.utils.paths import atomic_dump, dots_state_path + + +@dataclass +class DotsState: + # The AUR helper selected selected at install time + aur_helper: str = "paru" + + # The git rev of currently applied dots version + applied_rev: str | None = None + + # The currently enabled components + enabled_components: list[str] = field(default_factory=list) + + # Previously installed packages/local packages + packages: list[str] = field(default_factory=list) + local_packages: dict[str, list[str]] = field(default_factory=dict) + + @staticmethod + def load() -> "DotsState": + try: + data = json.loads(dots_state_path.read_text()) + except FileNotFoundError: + return DotsState() + except json.JSONDecodeError: + warn("failed to parse current dots state.") + return DotsState() + + return DotsState( + aur_helper=data.get("aur_helper"), + applied_rev=data.get("applied_rev"), + enabled_components=data.get("enabled_components", []), + packages=data.get("packages", []), + local_packages=data.get("local_packages", {}), + ) + + def save(self) -> None: + atomic_dump( + dots_state_path, + { + "aur_helper": self.aur_helper, + "applied_rev": self.applied_rev, + "enabled_components": self.enabled_components, + "packages": self.packages, + "local_packages": self.local_packages, + }, + ) diff --git a/src/caelestia/utils/paths.py b/src/caelestia/utils/paths.py index 62348d9d..a4b50c2d 100644 --- a/src/caelestia/utils/paths.py +++ b/src/caelestia/utils/paths.py @@ -25,6 +25,10 @@ templates_dir: Path = cli_data_dir / "templates" user_templates_dir: Path = c_config_dir / "templates" theme_dir: Path = c_state_dir / "theme" +config_backup_dir: Path = config_dir.parent / f"{config_dir.name}.bak" +dots_dir: Path = c_state_dir / "dots" +dots_state_path: Path = c_state_dir / "dots-state.json" + scheme_path: Path = c_state_dir / "scheme.json" scheme_data_dir: Path = cli_data_dir / "schemes" scheme_cache_dir: Path = c_cache_dir / "schemes"