Merge pull request #125 from caelestia-dots/feat/add-update-cmd

feat: add update cmd
This commit is contained in:
2 * r + 2 * t
2026-06-18 00:59:39 +10:00
committed by GitHub
12 changed files with 590 additions and 115 deletions
+6 -1
View File
@@ -1,7 +1,7 @@
set -l seen '__fish_seen_subcommand_from'
set -l has_opt '__fish_contains_opt'
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install update
set -l not_seen "not $seen $commands"
# Disable file completions
@@ -21,6 +21,7 @@ complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities'
complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper'
complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer'
complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles'
complete -c caelestia -n $not_seen -a 'update' -d 'Update the Caelestia dotfiles'
# Shell
set -l commands mpris drawers wallpaper notifs
@@ -133,3 +134,7 @@ complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to u
complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r
complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r
complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts'
# Update
complete -c caelestia -n "$seen update" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen update" -l 'noconfirm' -d 'Use defaults for all prompts'
+9 -1
View File
@@ -11,9 +11,11 @@ from caelestia.subcommands import (
screenshot,
shell,
toggle,
update,
wallpaper,
)
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import AUR_HELPERS
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import warn
from caelestia.utils.paths import wallpapers_dir
@@ -150,7 +152,7 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
formatter_class=argparse.RawDescriptionHelpFormatter,
)
install_parser.set_defaults(cls=install.Command)
install_parser.add_argument("--aur-helper", choices=["yay", "paru"], help="the AUR helper to use")
install_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
install_parser.add_argument(
"--enable-components", metavar="LIST", help="comma-separated list of components to enable"
)
@@ -160,6 +162,12 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
_set_install_epilog(install_parser)
# Create parser for update opts
update_parser = command_parser.add_parser("update", help="update the Caelestia dotfiles")
update_parser.set_defaults(cls=update.Command)
update_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
update_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
return parser, parser.parse_args()
+18 -89
View File
@@ -1,20 +1,18 @@
import os
import shutil
import subprocess
import textwrap
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError, expand, expand_dests
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import PROMPT_COLOUR, confirm, disable_input, fatal, format_msg, info, log, pause, prompt, warn
from caelestia.utils.io import confirm, disable_input, fatal, info, log, pause, prompt_selection, warn
from caelestia.utils.paths import (
config_backup_dir,
config_dir,
dots_dir,
)
@@ -38,9 +36,9 @@ class Command:
self.create_backup()
source, tip, manifest = self.fetch_manifest()
self.deploy_configs(source, manifest)
deployed = self.deploy_configs(source, manifest)
helper, packages, local_packages = self.install_packages(source, manifest)
self.run_hooks(manifest)
run_hooks(manifest, "post_install")
DotsState(
aur_helper=helper,
@@ -48,6 +46,7 @@ class Command:
enabled_components=manifest.enabled_components,
packages=packages,
local_packages=local_packages,
deployed_files=deployed,
).save()
self.print_done()
@@ -108,10 +107,14 @@ class Command:
disable = _parse_list_arg(self.args.disable_components)
try:
manifest = source.manifest_at(tip)
manifest.resolve_components(enable=enable, disable=disable)
# No flags given, prompt user for non-default components
if enable is None and disable is None:
self.prompt_optional_components(manifest)
optional = [name for name, comp in manifest.components.items() if not comp.default]
if optional:
enable = prompt_selection(optional, "Components to enable?")
manifest.resolve_components(enable=enable, disable=disable)
except (SourceError, ManifestError, ComponentError) as e:
fatal(e)
@@ -120,72 +123,17 @@ class Command:
return source, tip, manifest
def prompt_optional_components(self, manifest: Manifest) -> None:
comp_arr = manifest.disabled_components
if not comp_arr:
return
print(format_msg(PROMPT_COLOUR, True, "Components to enable?"))
max_idx_w = len(str(len(comp_arr)))
for i, comp in enumerate(comp_arr):
print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{comp}"))
print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)"))
def _valid_v(v: str) -> int:
try:
i_v = int(v, base=10) - 1 # -1 to translate to 0 index
except ValueError:
raise ValueError(f'Given value "{v}" must be an integer.')
if i_v < 0 or i_v >= len(comp_arr):
raise ValueError(f'Given value "{v}" must be between 1 and {len(comp_arr)} inclusive.')
return i_v
def _parse(ans: str) -> list[str] | None:
if ans in ("a", "all"):
return list(manifest.components)
if not ans:
return None
enabled: list[str] = []
for tok in ans.split():
fr, sep, to = tok.partition("-")
if sep:
fr = _valid_v(fr)
to = _valid_v(to)
if fr > to:
raise ValueError(f'Given range "{tok}" must be lo-hi.')
enabled += comp_arr[fr : to + 1]
elif tok.startswith("^"):
t = _valid_v(tok[1:])
enabled += comp_arr[:t] + comp_arr[t + 1 :]
else:
t = _valid_v(tok)
enabled.append(comp_arr[t])
return list(set(enabled))
while True:
ans = prompt("", end="").lower().strip()
try:
enabled = _parse(ans)
except ValueError as e:
warn(f"invalid input. {e} Please try again.")
continue
if enabled is not None:
manifest.resolve_components(enable=enabled)
return
def deploy_configs(self, source: DotsSource, manifest: Manifest) -> None:
def deploy_configs(self, source: DotsSource, manifest: Manifest) -> dict[str, str]:
print()
log("Installing configs...")
deployer = Deployer()
for entry in manifest.enabled_entries():
src = source.working_path(expand(entry.src))
src = source.working_path(entry.expanded_src())
if not src.exists():
warn(f"missing in source, skipping: {entry.src}")
continue
dests = expand_dests(entry.dest)
dests = entry.expanded_dests()
if not dests:
warn(f"dest glob matched nothing, skipping: {entry.dest}")
continue
@@ -194,6 +142,8 @@ class Command:
deployer.place(src, Path(dest))
info(f"{entry.src} -> {dest}")
return deployer.deployed_files
def install_packages(self, source: DotsSource, manifest: Manifest) -> tuple[str, list[str], dict[str, list[str]]]:
installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm)
@@ -208,31 +158,10 @@ class Command:
if local_dirs:
print()
log("Building local packages...")
for path in local_dirs:
directory = source.working_path(path)
if not directory.is_dir():
warn(f"missing in repo, skipping: {path}")
continue
log(f"Building {path}...")
local_packages[path] = installer.build_install(directory)
local_packages = build_local_packages(installer, source, local_dirs)
return getattr(installer, "helper", DEFAULT_AUR_HELPER), packages, local_packages
def run_hooks(self, manifest: Manifest) -> None:
hooks = manifest.enabled_hooks("post_install")
if not hooks:
return
print()
log("Running post-install hooks...")
env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)}
for hook in hooks:
info(f"Running hook: {hook}")
result = subprocess.run(hook, shell=True, env=env)
if result.returncode != 0:
warn(f"hook exited with {result.returncode}")
def print_done(self) -> None:
print()
info("All done! Caelestia has been installed.")
+227
View File
@@ -0,0 +1,227 @@
import sys
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.diff import Changeset
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import disable_input, fatal, info, log, prompt_selection, warn
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
state = DotsState.load()
if state.applied_rev is None:
fatal("dots not installed yet. Run `caelestia install` first.")
# Run system update
installer = PackageInstaller.get(self.args.aur_helper or state.aur_helper, self.args.noconfirm)
installer.system_update()
# Get manifest or exit if up to date
source, tip, manifest = self.fetch_manifest(state, state.applied_rev)
# Apply file changes
entries = manifest.enabled_entries()
try:
changeset = Changeset.compute(source, state.applied_rev, tip, entries, state.deployed_files)
source.checkout_tip()
except SourceError as e:
fatal(e)
new_files, revived_files, placed = self.deploy_changeset(source, changeset)
# Persist file changes immediately so a later failure can't lose track of them
deployed = dict(state.deployed_files)
for dest in (*changeset.deletes, *changeset.stale, *changeset.untracked):
deployed.pop(str(dest), None)
for repofile, dest in changeset.remap:
deployed[str(dest)] = repofile
deployed.update(placed)
state.deployed_files = deployed
state.save()
# Install new/remove old packages
desired = manifest.enabled_packages()
state.packages = self.sync_packages(installer, state.packages, desired)
state.save()
# Install new/remove old local PKGBUILD packages
desired_local = manifest.enabled_local_packages()
state.local_packages = self.sync_local_packages(installer, source, state.local_packages, desired_local)
state.save()
# Run hooks
run_hooks(manifest, "post_update")
# Mark the new revision applied
state.applied_rev = tip
state.enabled_components = manifest.enabled_components
state.aur_helper = getattr(installer, "helper", state.aur_helper)
state.save()
self.summarize(changeset, new_files, revived_files)
def fetch_manifest(self, state: DotsState, applied_rev: str) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.tip_rev()
if tip == applied_rev:
info("Dots already up to date.")
sys.exit(0)
manifest = source.manifest_at(tip)
if source.has_rev(applied_rev):
known = set(source.manifest_at(applied_rev).components)
else:
# Treat all components as known if rev is invalid so we don't overwrite existing prefs
known = set(manifest.components)
except (SourceError, ManifestError) as e:
fatal(e)
# Enable components recorded at install time + any new components that are default on
enabled = [
name
for name, comp in manifest.components.items()
if name in state.enabled_components or (name not in known and comp.default)
]
# Let the user opt into any new optional components
new_comps = [name for name, comp in manifest.components.items() if name not in known and not comp.default]
if new_comps:
info(f"New components: {', '.join(new_comps)}")
enabled += prompt_selection(new_comps, "Components to enable?")
disabled = [name for name in manifest.components if name not in enabled]
try:
manifest.resolve_components(enable=enabled, disable=disabled)
except ComponentError as e:
fatal(e)
info(f"Enabled components: {', '.join(enabled) or 'none'}")
return source, tip, manifest
def deploy_changeset(
self, source: DotsSource, changeset: Changeset
) -> tuple[list[Path], list[Path], dict[str, str]]:
print()
if changeset.is_empty():
info("No configs to update.")
return [], [], {}
log("Updating configs...")
deployer = Deployer()
for repofile, dest in changeset.place:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
deployer.place_file(src, dest)
info(f"{repofile} -> {dest}")
new_files = []
for repofile, dest in changeset.conflicts:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
new_files.append(new_path)
warn(f"{dest} has local changes; upstream version written as {new_path.name}")
revived_files = []
for repofile, dest in changeset.deleted_changed:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
revived_files.append(new_path)
warn(f"{dest} was removed but changed upstream; upstream version written as {new_path.name}")
for dest in changeset.deletes:
deployer.remove(dest)
deployer.prune_empty_dirs(dest, Path.home())
info(f"Removed {dest}")
return new_files, revived_files, deployer.deployed_files
def sync_packages(self, installer: PackageInstaller, current: list[str], desired: list[str]) -> list[str]:
to_install = [p for p in desired if p not in current]
to_remove = [p for p in current if p not in desired]
installed = list(current)
if to_install:
print()
info(f"Installing new packages: {', '.join(to_install)}")
installer.install(to_install)
installed.extend(p for p in to_install if p not in installed)
if to_remove:
print()
info(f"Packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Packages to remove?")
if selected:
installer.remove(selected)
installed = [p for p in installed if p not in selected]
return installed
def sync_local_packages(
self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str]
) -> dict[str, list[str]]:
to_build = [p for p in desired if p not in current]
to_remove = [p for p in current if p not in desired]
installed = dict(current)
if to_build:
print()
log(f"Building new local packages: {', '.join(to_build)}")
installed.update(build_local_packages(installer, source, to_build))
if to_remove:
print()
info(f"Local packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Local packages to remove?")
if selected:
installer.remove([pkg for path in selected for pkg in current[path]])
for path in selected:
installed.pop(path, None)
return installed
def summarize(self, changeset: Changeset, new_files: list[Path], revived_files: list[Path]) -> None:
print()
conflicts = len(new_files) + len(revived_files)
info(f"Updated {len(changeset.place)} file(s), removed {len(changeset.deletes)}, {conflicts} conflict(s).")
if new_files:
info("The following files were changed upstream but you had edited them locally.")
info("Your versions were kept; the upstream versions were written alongside as .new:")
for path in new_files:
info(f" {path}")
if revived_files:
info("These files were removed by you but changed upstream, so were not restored.")
info("The upstream versions were written alongside as .new:")
for path in revived_files:
info(f" {path}")
if changeset.stale:
info("These files are no longer managed but differ from what was installed, so were kept:")
for path in changeset.stale:
info(f" {path}")
+28 -2
View File
@@ -2,10 +2,18 @@ import shutil
import tempfile
from pathlib import Path
from caelestia.utils.paths import cache_dir, config_dir, data_dir, dots_dir, state_dir
# Dirs to never prune even if empty
_PROTECTED_DIRS = frozenset({Path.home(), config_dir, data_dir, state_dir, cache_dir})
class Deployer:
"""Places files from the dots clone into their destinations."""
def __init__(self):
self.deployed_files: dict[str, str] = {}
def place(self, src: Path, dest: Path) -> None:
"""Place a whole entry (file or directory tree), replacing any existing dest."""
@@ -27,7 +35,7 @@ class Deployer:
elif path.is_dir():
(dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True)
def place_file(self, src: Path, dest: Path) -> None:
def place_file(self, src: Path, dest: Path, record: bool = True) -> None:
"""Atomically place a single file, replacing any existing dest."""
if dest.is_dir() and not dest.is_symlink():
@@ -44,11 +52,15 @@ class Deployer:
Path(f.name).unlink()
raise
if record:
# Keep relative to dots dir
self.deployed_files[str(dest)] = str(src.relative_to(dots_dir))
def write_new(self, src: Path, dest: Path) -> Path:
"""Write the upstream version alongside dest as <dest>.new and return that path."""
new_path = dest.parent / f"{dest.name}.new"
self.place_file(src, new_path)
self.place_file(src, new_path, record=False)
return new_path
def remove(self, path: Path) -> None:
@@ -56,3 +68,17 @@ class Deployer:
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
def prune_empty_dirs(self, start: Path, stop: Path) -> None:
"""Removes dirs recursively from start to stop.
Will never prune protected dirs (home, config, cache, etc).
"""
parent = start.parent
while parent != stop and stop in parent.parents and parent not in _PROTECTED_DIRS:
try:
parent.rmdir()
except OSError:
break
parent = parent.parent
+153
View File
@@ -0,0 +1,153 @@
from dataclasses import dataclass, field
from pathlib import Path
from caelestia.utils.dots.manifest import ManifestEntry
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.io import warn
class _Continue(Exception):
"""Signals the deployed-files loop to skip to the next entry."""
def _read_local(path: Path) -> bytes | None:
"""Read a local file, returning None if it can't be read (perms, is a dir, etc.)."""
try:
return path.read_bytes()
except OSError:
return None
@dataclass(frozen=True)
class Changeset:
place: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) to fast-forward
conflicts: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) -> write .new
deletes: list[Path] = field(default_factory=list) # We placed it, upstream removed it, unmodified
stale: list[Path] = field(default_factory=list) # Upstream removed it but user modified it
deleted_changed: list[tuple[str, Path]] = field(default_factory=list) # User deleted it, upstream changed -> .new
untracked: list[Path] = field(default_factory=list) # Gone + no longer managed; drop from state
remap: list[tuple[str, Path]] = field(default_factory=list) # Up to date but source path moved; restate mapping
def is_empty(self) -> bool:
return not (self.place or self.conflicts or self.deletes or self.stale or self.deleted_changed)
@staticmethod
def compute(
source: DotsSource,
applied_rev: str,
tip: str,
entries: list[ManifestEntry],
deployed: dict[str, str],
) -> "Changeset":
"""Collect all file changes needed into a Changeset."""
has_base = source.has_rev(applied_rev)
if not has_base:
warn(
"the previously applied revision is missing from the dots clone; files that differ "
"from the latest version will be written as .new instead of updated in place."
)
changed = set(source.changed_files(applied_rev, tip)) if has_base else set()
place: list[tuple[str, Path]] = []
conflicts: list[tuple[str, Path]] = []
deletes: list[Path] = []
stale: list[Path] = []
deleted_changed: list[tuple[str, Path]] = []
untracked: list[Path] = []
remap: list[tuple[str, Path]] = []
# Collect all files to deploy (entry sources can be dirs so we recurse into them)
to_deploy: dict[Path, str] = {}
for entry in entries:
src_root = str(entry.expanded_src())
repo_files = source.files_at(tip, src_root)
for dest in entry.expanded_dests():
for repo_file in repo_files:
to_deploy[dest / Path(repo_file).relative_to(src_root)] = repo_file
files_to_deploy = set(to_deploy)
# Already deployed files
for dest, src in deployed.items():
dest_path = Path(dest)
def try_read(rev: str, path: str) -> bytes:
try:
return source.blob_at(rev, path)
except SourceError:
# Read failed, keep it just in case
stale.append(dest_path)
raise _Continue
try:
if dest_path not in files_to_deploy: # No longer managed by any entry
if not dest_path.exists():
# Gone from disk and no entry manages it
untracked.append(dest_path)
continue
local = _read_local(dest_path)
if local is not None and has_base and try_read(applied_rev, src) == local:
deletes.append(dest_path)
else:
# Modified, or unreadable so we can't verify; keep it just in case
stale.append(dest_path)
else: # Still managed; `src` is what we last placed, `new_src` the current source
new_src = to_deploy[dest_path]
if not dest_path.exists():
# User deleted a managed file locally
if has_base and new_src == src and new_src not in changed:
continue # Respect the deletion; upstream has nothing new to offer
# Upstream changed it (or base is unknown): surface as .new, don't restore
deleted_changed.append((new_src, dest_path))
continue
if has_base and new_src == src and new_src not in changed:
continue # Unchanged upstream
dest_content = _read_local(dest_path)
if dest_content is None:
# Unreadable (perms, became a dir, ...); surface upstream as .new, don't clobber
conflicts.append((new_src, dest_path))
continue
if try_read(tip, new_src) == dest_content:
# Already up to date; restate the mapping if the source path moved
if new_src != src:
remap.append((new_src, dest_path))
continue
# Fast-forward only when the user hasn't edited since last deploy
if has_base and try_read(applied_rev, src) == dest_content:
place.append((new_src, dest_path))
else:
conflicts.append((new_src, dest_path))
except _Continue:
continue
# New files to deploy
for dest in files_to_deploy - set(Path(d) for d in deployed):
src = to_deploy[dest]
try:
new_content = source.blob_at(tip, src)
except SourceError:
# Failed to read the upstream blob; skip rather than abort the whole update
warn(f"could not read from source, skipping: {src}")
continue
if not dest.exists() or new_content == _read_local(dest):
# Dest nonexistent or already equal to new content
place.append((src, dest))
else:
# Differs, or exists but unreadable; surface upstream as .new
conflicts.append((src, dest))
return Changeset(
place=place,
conflicts=conflicts,
deletes=deletes,
stale=stale,
deleted_changed=deleted_changed,
untracked=untracked,
remap=remap,
)
+25 -19
View File
@@ -7,6 +7,8 @@ from pathlib import Path
from string import Template
from typing import Any
from caelestia.utils.io import warn
_XDG_DEFAULTS = {
"XDG_CONFIG_HOME": str(Path.home() / ".config"),
"XDG_DATA_HOME": str(Path.home() / ".local/share"),
@@ -25,36 +27,38 @@ class ComponentError(Exception):
"""Raised when component flags are invalid or contradictory."""
def expand(text: str) -> Path:
def _expand(text: str) -> Path:
"""Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path."""
env = {**_XDG_DEFAULTS, **os.environ}
return Path(Template(text).safe_substitute(env)).expanduser()
def expand_dests(dest: str) -> list[Path]:
"""Expand globs within a dest path.
Globs from the start until the segment with the last glob so subdirs are
created if they didn't exist previously.
"""
expanded = expand(dest)
if not _GLOB_MAGIC.search(str(expanded)):
return [expanded]
parts = expanded.parts
glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part))
pattern = str(Path(*parts[: glob_idx + 1]))
tail = parts[glob_idx + 1 :]
return [Path(match, *tail) for match in sorted(glob.glob(pattern))]
@dataclass(frozen=True)
class ManifestEntry:
src: str
dest: str
def expanded_src(self) -> Path:
return _expand(self.src)
def expanded_dests(self) -> list[Path]:
"""The dest path with globs expanded.
Globs from the start until the segment with the last glob so subdirs are
created if they didn't exist previously.
"""
expanded = _expand(self.dest)
if not _GLOB_MAGIC.search(str(expanded)):
return [expanded]
parts = expanded.parts
glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part))
pattern = str(Path(*parts[: glob_idx + 1]))
tail = parts[glob_idx + 1 :]
return [Path(match, *tail) for match in sorted(glob.glob(pattern))]
@dataclass(frozen=True)
class ManifestComponent:
@@ -103,6 +107,8 @@ class Manifest:
components = {}
for comp in raw.get("components", []):
parsed = _parse_component(comp)
if parsed.name in components:
warn(f"duplicate component '{parsed.name}'; using the last definition")
components[parsed.name] = parsed
return Manifest(
+39
View File
@@ -0,0 +1,39 @@
import os
import subprocess
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import PackageInstaller
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import info, log, warn
from caelestia.utils.paths import dots_dir
def build_local_packages(installer: PackageInstaller, source: DotsSource, paths: list[str]) -> dict[str, list[str]]:
"""Build and install each local PKGBUILD dir, returning {path: installed package names}."""
built: dict[str, list[str]] = {}
for path in paths:
directory = source.working_path(path)
if not directory.is_dir():
warn(f"missing in repo, skipping: {path}")
continue
log(f"Building {path}...")
built[path] = installer.build_install(directory)
return built
def run_hooks(manifest: Manifest, kind: str) -> None:
"""Run the global + enabled components' hooks of the given kind (e.g. "post_install")."""
hooks = manifest.enabled_hooks(kind)
if not hooks:
return
print()
log(f"Running {kind.replace('_', '-')} hooks...")
env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)}
for hook in hooks:
info(f"Running hook: {hook}")
result = subprocess.run(hook, shell=True, env=env)
if result.returncode != 0:
warn(f"hook exited with {result.returncode}")
+9
View File
@@ -73,6 +73,9 @@ class PackageInstaller(ABC):
def build_install(self, directory: Path) -> list[str]:
"""Build and install the PKGBUILD in `directory`, returning the installed package names."""
@abstractmethod
def system_update(self) -> None: ...
class NoopInstaller(PackageInstaller):
"""Used off Arch, where the dots' packages are not available via pacman/AUR."""
@@ -89,6 +92,9 @@ class NoopInstaller(PackageInstaller):
info(f"Skipping local package build (not on Arch): {directory}")
return []
def system_update(self) -> None:
info("Skipping system update (not on Arch)")
class ArchInstaller(PackageInstaller):
def __init__(self, helper: str, noconfirm: bool = False) -> None:
@@ -128,3 +134,6 @@ class ArchInstaller(PackageInstaller):
subprocess.run(["makepkg", "-fsi", *self.flags], cwd=directory, env=env, check=True)
return names
def system_update(self) -> None:
subprocess.run([self.helper, "-Syu", *self.flags], check=True)
+19 -3
View File
@@ -17,6 +17,8 @@ class DotsSource:
cfg = get_config().get("dots", {})
self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git")
self.branch = cfg.get("branch", "main")
# Cache git blobs by (ref, relpath); objects are immutable for a given rev
self._blob_cache: dict[tuple[str, str], bytes] = {}
@property
def remote_ref(self) -> str:
@@ -68,6 +70,15 @@ class DotsSource:
out = self._git("diff", "--name-only", base, head)
return [line for line in out.splitlines() if line]
def has_rev(self, rev: str) -> bool:
"""Whether `rev` resolves to a commit."""
try:
self._git("rev-parse", "--verify", "--quiet", f"{rev}^{{commit}}")
return True
except SourceError:
return False
def clean(self) -> None:
"""Remove all untracked files in the git repo."""
self._git("clean", "-fdx")
@@ -81,7 +92,10 @@ class DotsSource:
return self._git("show", f"{ref}:{relpath}")
def blob_at(self, ref: str, relpath: str) -> bytes:
return self._git_bytes("show", f"{ref}:{relpath}")
key = (ref, relpath)
if key not in self._blob_cache:
self._blob_cache[key] = self._git_bytes("show", f"{ref}:{relpath}")
return self._blob_cache[key]
def files_at(self, ref: str, relpath: str) -> list[str]:
"""Repo-relative paths of all files under relpath at ref (the path itself if it is a file)."""
@@ -92,10 +106,12 @@ class DotsSource:
# --- Helpers ---
def _git(self, *args: str) -> str:
return self._run("git", "-C", str(dots_dir), *args)
# core.quotePath=false so non-ASCII paths come back verbatim, not octal-escaped
return self._run("git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args)
def _git_bytes(self, *args: str) -> bytes:
result = subprocess.run(["git", "-C", str(dots_dir), *args], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = ["git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed")
return result.stdout
+6
View File
@@ -21,6 +21,10 @@ class DotsState:
packages: list[str] = field(default_factory=list)
local_packages: dict[str, list[str]] = field(default_factory=dict)
# Files placed by the last deploy. Only files, not directories
# Maps dest -> src
deployed_files: dict[str, str] = field(default_factory=dict)
@staticmethod
def load() -> "DotsState":
try:
@@ -37,6 +41,7 @@ class DotsState:
enabled_components=data.get("enabled_components", []),
packages=data.get("packages", []),
local_packages=data.get("local_packages", {}),
deployed_files=data.get("deployed_files", {}),
)
def save(self) -> None:
@@ -48,5 +53,6 @@ class DotsState:
"enabled_components": self.enabled_components,
"packages": self.packages,
"local_packages": self.local_packages,
"deployed_files": self.deployed_files,
},
)
+51
View File
@@ -80,6 +80,57 @@ def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool:
return answer in ("y", "yes")
def prompt_selection(items: list[str], header: str) -> list[str]:
"""Prompt the user to pick from a numbered list, returning the selected items.
Accepts `[A]ll`/`a`, single indices, ranges (`1-3`) and exclusions (`^4`).
Empty input selects nothing. Re-prompts until the input parses.
"""
print(format_msg(PROMPT_COLOUR, True, header))
max_idx_w = len(str(len(items)))
for i, item in enumerate(items):
print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{item}"))
print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)"))
def valid_idx(v: str) -> int:
try:
idx = int(v, base=10) - 1 # -1 to translate to 0 index
except ValueError:
raise ValueError(f'Given value "{v}" must be an integer.')
if idx < 0 or idx >= len(items):
raise ValueError(f'Given value "{v}" must be between 1 and {len(items)} inclusive.')
return idx
def parse(ans: str) -> list[str]:
if ans in ("a", "all"):
return list(items)
if not ans:
return []
selected: list[str] = []
for tok in ans.split():
fr, sep, to = tok.partition("-")
if sep:
lo, hi = valid_idx(fr), valid_idx(to)
if lo > hi:
raise ValueError(f'Given range "{tok}" must be lo-hi.')
selected += items[lo : hi + 1]
elif tok.startswith("^"):
t = valid_idx(tok[1:])
selected += items[:t] + items[t + 1 :]
else:
selected.append(items[valid_idx(tok)])
return list(set(selected))
while True:
ans = prompt("", end="").lower().strip()
try:
return parse(ans)
except ValueError as e:
warn(f"invalid input. {e} Please try again.")
def pause() -> None:
if _disable_input:
return