86 Commits

Author SHA1 Message Date
2 * r + 2 * t b00dabaa93 chore: update readme 2026-06-20 14:35:26 +10:00
2 * r + 2 * t 9cae22e790 feat: remove app2unit 2026-06-20 14:28:03 +10:00
github-actions 0518ca0131 [CI] chore: update flake 2026-06-20 04:16:19 +00:00
2 * r + 2 * t 702baa117b fix: resolve actual package names
provides=... allows aliasing packages to others, and pacman -D only
takes the actual package names, so we need to resolve them.
2026-06-19 20:17:50 +10:00
2 * r + 2 * t 2e9a387951 fix: update local packages during update cmd (#128)
* fix: update local packages based on pkgver/rel

* fixes

* fix: default no rebuild on vercmp fail
2026-06-19 18:17:02 +10:00
2 * r + 2 * t cfc62f683e fix: deref legacy symlinks (#127)
* fix: deref legacy syms before deploy

Deploy will completely overwrite symlinked dirs, so you'd lose all extra
content inside them
Also deref syms in config backup before deleting legacy dir

* fix: restore link if deref fails
2026-06-19 17:11:39 +10:00
2 * r + 2 * t 096e583618 fix: only match dirs when globs if a tail is given 2026-06-19 02:40:13 +10:00
2 * r + 2 * t 8535338e6f feat: add post package hook + deploy after package
Deploy after package install on install command. Update command stays
the same
2026-06-19 01:28:47 +10:00
2 * r + 2 * t 6c3b69cb84 fix: catch errors with package installation 2026-06-18 22:36:59 +10:00
2 * r + 2 * t f53b3d036f feat: add migration step to install cmd (#126)
* feat: add migration step to install cmd

* fix: set packages to explicitly installed

* fix: legacy remote check

Oops

* fix: generator

* fix: better legacy detection

* fix: run legacy detection before deployment

Also fix unlink on dir

* fix: legacy file check issue

* fix: handle no legacy

* fix: make sure not to go past home when looking for repo

* fix: wrong dir for default legacy path

* fix: catch errors with deleting legacy install
2026-06-18 21:40:05 +10:00
github-actions 7ff0913826 [CI] chore: update flake 2026-06-18 04:43:48 +00:00
2 * r + 2 * t 32a88d4d62 Merge pull request #125 from caelestia-dots/feat/add-update-cmd
feat: add update cmd
2026-06-18 00:59:39 +10:00
2 * r + 2 * t 3d51f46b21 feat: cache git blobs 2026-06-18 00:04:23 +10:00
2 * r + 2 * t 91e55a322f fix: account for fs errors when reading files 2026-06-17 23:57:10 +10:00
2 * r + 2 * t 844f6d22b3 fix: account for moved src 2026-06-17 23:56:42 +10:00
2 * r + 2 * t c09cd1a609 fix: warn on duplicate components in manifest 2026-06-17 23:17:24 +10:00
2 * r + 2 * t 0410fed68c fix: remove untracked files from state 2026-06-17 23:17:08 +10:00
2 * r + 2 * t d83a85745d fix: save state after each update phase
So cancellations per phase don't leave state partially wrong
Also part 2 of the previous commit, wire into updater
2026-06-17 22:13:10 +10:00
2 * r + 2 * t 68a758a53b fix: handle user deleted but upstream changed properly
Also catch error reading blob for new files
2026-06-17 22:11:39 +10:00
2 * r + 2 * t 338c78f789 fix: disable git transforming weird chars
Git transforms non ascii and other chars into octal escaped versions,
which we don't want
2026-06-17 22:10:17 +10:00
2 * r + 2 * t be13e5897b feat: add completions for update cmd 2026-06-17 21:28:58 +10:00
2 * r + 2 * t 4824483bba feat: implement update command 2026-06-17 21:27:30 +10:00
2 * r + 2 * t a0aa37bb9b fix: remove duplicate resolve component 2026-06-17 21:05:46 +10:00
2 * r + 2 * t 710cba39c3 refactor: reusable select prompt + hooks + local build 2026-06-17 20:57:17 +10:00
2 * r + 2 * t c8e18ef6ed refactor: use aur helpers constant for parser 2026-06-17 20:21:21 +10:00
2 * r + 2 * t 222023f6d5 fix: handle applied rev not existing in diff 2026-06-17 20:20:50 +10:00
2 * r + 2 * t 7def47d120 feat: add diff module 2026-06-17 17:46:55 +10:00
github-actions 5e2335feb9 [CI] chore: update flake 2026-06-17 04:49:48 +00:00
2 * r + 2 * t be61b8b072 feat: record files deployed 2026-06-17 01:52:31 +10:00
2 * r + 2 * t 0980986ed4 refactor: move expand/expand_dests to entry methods 2026-06-17 00:58:06 +10:00
github-actions 63a6e5a6f2 [CI] chore: update flake 2026-06-16 05:09:25 +00:00
2 * r + 2 * t ecf6c2723d Merge pull request #123 from caelestia-dots/feat/add-install-cmd
feat: add install command
2026-06-16 01:33:40 +10:00
2 * r + 2 * t 342dfc71e1 fix: wrong docstring 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 51e858b73f fix: mutable default param 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 7d9b685918 fix: only fetch source once 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 1f5b39281c feat: add completions for install cmd 2026-06-15 23:51:35 +10:00
2 * r + 2 * t a8f0dc3271 feat: add install command to parser 2026-06-15 23:48:00 +10:00
2 * r + 2 * t e02fc7427d feat: allow disabling print prefix 2026-06-15 23:47:53 +10:00
2 * r + 2 * t 56f2e94d5b fix: no hooks subobject for consistency
So global hooks are consistent with per component hooks
2026-06-15 23:46:56 +10:00
2 * r + 2 * t d1ed5d9db1 fix: deterministic component ordering
Keep manifest order for package installation
2026-06-15 23:46:56 +10:00
2 * r + 2 * t 73bc3aadab feat: retry on invalid input instead of exiting 2026-06-15 23:46:56 +10:00
2 * r + 2 * t d55647fd03 feat: add more info at end of install
Also add newlines between sections
2026-06-15 23:46:56 +10:00
2 * r + 2 * t 1fc51410fc chore: log -> info for hooks 2026-06-15 23:46:56 +10:00
2 * r + 2 * t a8d67b44ee fix: stop makepkg from resetting sudo 2026-06-15 23:46:56 +10:00
2 * r + 2 * t c93fa1488e fix: align component prompt regardless of digits 2026-06-15 23:46:56 +10:00
2 * r + 2 * t 024df497d1 fix: re-clone repo if url changed 2026-06-15 22:57:16 +10:00
2 * r + 2 * t 994f2d86f5 feat: prompt installing optional components 2026-06-15 22:57:16 +10:00
2 * r + 2 * t efd59b79d9 fix: catch source errors 2026-06-15 22:57:16 +10:00
2 * r + 2 * t e6031ad544 feat: add script for testing in sandbox 2026-06-15 22:57:16 +10:00
github-actions 5c062e6897 [CI] chore: update flake 2026-06-15 05:12:57 +00:00
2 * r + 2 * t f85103eac5 fix: deployer place dir docstring 2026-06-14 21:39:01 +10:00
2 * r + 2 * t aef48072ec fix: actually use component error 2026-06-14 21:29:56 +10:00
2 * r + 2 * t 216547c9c1 fix: ensure only single resolve for manifest comps 2026-06-14 21:25:53 +10:00
2 * r + 2 * t 8627b7b96f fix: use tempdir for aur helper install 2026-06-14 21:23:10 +10:00
2 * r + 2 * t 44df61b22d refactor: set default aur helper to constant 2026-06-14 21:22:58 +10:00
2 * r + 2 * t 36a6029a2c feat: add install command
Not wired yet
2026-06-14 21:14:33 +10:00
github-actions 4090c4fc91 [CI] chore: update flake 2026-06-14 04:57:48 +00:00
2 * r + 2 * t 393dbf6363 feat: allow disabling input in io module 2026-06-13 20:19:22 +10:00
2 * r + 2 * t 586f4d9665 fix: allow logging exceptions + Never fatal return 2026-06-13 20:18:48 +10:00
2 * r + 2 * t 14732e9850 fix: handle ctrl+c/d cleanly
Also add newline before pause prompt
2026-06-13 03:12:06 +10:00
2 * r + 2 * t 1c707d3a16 refactor: make resizer use new loggers 2026-06-13 02:59:06 +10:00
2 * r + 2 * t c236823b76 chore: format 2026-06-13 02:59:06 +10:00
2 * r + 2 * t 002a9c287f refactor: add get_config func 2026-06-13 02:59:06 +10:00
2 * r + 2 * t d7b65b5946 refactor: move atomic write to paths
Also make it a true atomic write via os.rename (create temp in parent
dir so guaranteed same fs)
2026-06-13 02:14:11 +10:00
2 * r + 2 * t c860b389c3 refactor: rename logging -> io + add more funcs 2026-06-13 01:50:55 +10:00
github-actions 3f3229aed4 [CI] chore: update flake 2026-06-12 04:44:32 +00:00
github-actions b790c32715 [CI] chore: update flake 2026-06-11 04:41:47 +00:00
github-actions 6ddbb4f1c3 [CI] chore: update flake 2026-06-10 04:19:30 +00:00
github-actions 05b7714289 [CI] chore: update flake 2026-06-09 04:08:00 +00:00
github-actions 84790f8fc3 [CI] chore: update flake 2026-06-08 04:53:59 +00:00
github-actions 505a02f5ab [CI] chore: update flake 2026-06-07 04:47:43 +00:00
github-actions d1c8c8fc09 [CI] chore: update flake 2026-06-02 04:44:32 +00:00
2 * r + 2 * t ad533a0dd4 fix: only screenshot focused monitor in fs mode 2026-06-01 21:06:04 +10:00
İlyas ccd2712982 fix: Lua dispatcher compat (#112)
* fix: temporary Lua dispatcher compat for workspace dispatchers

* fix(resizer): add Lua dispatcher compat for window resize/move/float/center

* feat(theme): write current.lua for Hyprland Lua config, current.conf for hyprlang

* fix: align gen_lua format with #111

* refactor address review feedback
refactor(hypr,theme): address review feedback

- cache is_lua_config result to avoid redundant socket calls
- remove is_lua_config wrapper, rename _is_lua_config to is_lua_config
- move hypr import to top of theme.py
- use single line syntax in apply_hypr and apply_colours

* restore original specialws behavior and some formatting
2026-05-31 23:48:33 +10:00
github-actions 1ea661859d [CI] chore: update flake 2026-05-31 04:25:32 +00:00
github-actions 64a5507e74 [CI] chore: update flake 2026-05-26 04:08:39 +00:00
github-actions 7fa3fc1bd0 [CI] chore: update flake 2026-05-24 04:20:58 +00:00
github-actions 7f30062670 [CI] chore: update flake 2026-05-23 04:01:11 +00:00
github-actions 04d286eaff [CI] chore: update flake 2026-05-17 04:09:50 +00:00
github-actions 2ce6213698 [CI] chore: update flake 2026-05-13 03:57:27 +00:00
Zynix 4b3ffcd644 fix: defer DynamicScheme annotation evaluation (#110) 2026-05-11 16:48:11 +10:00
github-actions 2621724c55 [CI] chore: update flake 2026-05-10 04:03:00 +00:00
github-actions 7b8a4281aa [CI] chore: update flake 2026-05-07 03:45:02 +00:00
github-actions 7452974dc9 [CI] chore: update flake 2026-05-06 03:53:45 +00:00
github-actions 544b567668 [CI] chore: update flake 2026-05-05 03:33:35 +00:00
github-actions 1f523c7556 [CI] chore: update flake 2026-05-03 04:00:27 +00:00
28 changed files with 2054 additions and 179 deletions
+7 -2
View File
@@ -8,7 +8,6 @@ The main control script for the Caelestia dotfiles.
- [`swappy`](https://github.com/jtheoof/swappy) - screenshot editor
- [`grim`](https://gitlab.freedesktop.org/emersion/grim) - taking screenshots
- [`dart-sass`](https://github.com/sass/dart-sass) - discord theming
- [`app2unit`](https://github.com/Vladimir-csp/app2unit) - launching apps
- [`wl-clipboard`](https://github.com/bugaevc/wl-clipboard) - copying to clipboard
- [`slurp`](https://github.com/emersion/slurp) - selecting an area
- [`gpu-screen-recorder`](https://git.dec05eba.com/gpu-screen-recorder/about) - screen recording
@@ -77,7 +76,7 @@ Install all [dependencies](#dependencies), then install
e.g. via an AUR helper (yay)
```sh
yay -S libnotify swappy grim dart-sass app2unit wl-clipboard slurp gpu-screen-recorder glib2 cliphist fuzzel python-build python-installer python-hatch python-hatch-vcs
yay -S libnotify swappy grim dart-sass wl-clipboard slurp gpu-screen-recorder glib2 cliphist fuzzel python-build python-installer python-hatch python-hatch-vcs
```
Now, clone the repo, `cd` into it, build the wheel via `python -m build --wheel`
@@ -159,6 +158,8 @@ subcommands:
emoji emoji/glyph utilities
wallpaper manage the wallpaper
resizer window resizer daemon
install install the Caelestia dotfiles
update update the Caelestia dotfiles
```
### User templates
@@ -256,6 +257,10 @@ All configuration options are in `~/.config/caelestia/cli.json`.
"move": true
}
}
},
"dots": {
"url": "https://github.com/caelestia-dots/caelestia.git",
"branch": "main"
}
}
```
+9
View File
@@ -0,0 +1,9 @@
#!/usr/bin/env sh
export HOME=/tmp/install-test
export XDG_CONFIG_HOME=$HOME/.config
export XDG_DATA_HOME=$HOME/.local/share
export XDG_STATE_HOME=$HOME/.local/state
export XDG_CACHE_HOME=$HOME/.cache
"$@"
+13 -1
View File
@@ -1,7 +1,7 @@
set -l seen '__fish_seen_subcommand_from'
set -l has_opt '__fish_contains_opt'
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install update
set -l not_seen "not $seen $commands"
# Disable file completions
@@ -20,6 +20,8 @@ complete -c caelestia -n $not_seen -a 'clipboard' -d 'Open clipboard history'
complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities'
complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper'
complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer'
complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles'
complete -c caelestia -n $not_seen -a 'update' -d 'Update the Caelestia dotfiles'
# Shell
set -l commands mpris drawers wallpaper notifs
@@ -126,3 +128,13 @@ complete -c caelestia -n "$seen emoji" -s 'f' -l 'fetch' -d 'Fetch emoji/glyph d
complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode'
complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode'
complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window'
# Install (component flags come from the manifest, so are not completed statically)
complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r
complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r
complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts'
# Update
complete -c caelestia -n "$seen update" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen update" -l 'noconfirm' -d 'Use defaults for all prompts'
+4 -5
View File
@@ -8,7 +8,7 @@
slurp,
wl-clipboard,
cliphist,
app2unit,
xdg-utils,
dart-sass,
grim,
fuzzel,
@@ -46,7 +46,7 @@ python3.pkgs.buildPythonApplication {
slurp
wl-clipboard
cliphist
app2unit
xdg-utils
dart-sass
grim
fuzzel
@@ -65,11 +65,10 @@ python3.pkgs.buildPythonApplication {
substituteInPlace src/caelestia/subcommands/screenshot.py \
--replace-fail '"qs", "-c", "caelestia"' '"caelestia-shell"'
# Use config bin instead of discord + fix todoist + fix app2unit
# Use config bin instead of discord + fix todoist
substituteInPlace src/caelestia/subcommands/toggle.py \
--replace-fail 'discord' ${discordBin} \
--replace-fail '["todoist"]' '["todoist.desktop"]'\
--replace-fail 'app2unit' ${app2unit}/bin/app2unit
--replace-fail '["todoist"]' '["todoist.desktop"]'
# Use config style instead of darkly
substituteInPlace src/caelestia/data/templates/qtengine.json \
Generated
+28 -10
View File
@@ -3,17 +3,18 @@
"caelestia-shell": {
"inputs": {
"caelestia-cli": [],
"m3shapes": "m3shapes",
"nixpkgs": [
"nixpkgs"
],
"quickshell": "quickshell"
},
"locked": {
"lastModified": 1777688289,
"narHash": "sha256-2EaEVkT1oUpjLLp7uEY/hDYDOa2k5R1YgcJpHei+lUM=",
"lastModified": 1781850732,
"narHash": "sha256-YKAWz4bSguUWwc1GxOHXRFl4fT+t9WnA2VoZGIRdFVc=",
"owner": "caelestia-dots",
"repo": "shell",
"rev": "4e9e1f4b723f7e3a87cb280d67a25ee92c87fbff",
"rev": "37e603fbf6f973a09f451553b61ac584d9877cf1",
"type": "github"
},
"original": {
@@ -22,13 +23,30 @@
"type": "github"
}
},
"m3shapes": {
"flake": false,
"locked": {
"lastModified": 1781017666,
"narHash": "sha256-kfHyzZaPHgqZML48OA+5JwBOsLdQJ2ci/aGPShvUB4Y=",
"owner": "soramanew",
"repo": "m3shapes",
"rev": "bdc327b29f95394a732baf3c9b19658ba23755b6",
"type": "github"
},
"original": {
"owner": "soramanew",
"repo": "m3shapes",
"rev": "bdc327b29f95394a732baf3c9b19658ba23755b6",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1777268161,
"narHash": "sha256-bxrdOn8SCOv8tN4JbTF/TXq7kjo9ag4M+C8yzzIRYbE=",
"lastModified": 1781577229,
"narHash": "sha256-lrp67w8AulE9Ks53n27I45ADSzbOCn4H+CNW1Ck8B+8=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "1c3fe55ad329cbcb28471bb30f05c9827f724c76",
"rev": "567a49d1913ce81ac6e9582e3553dd90a955875f",
"type": "github"
},
"original": {
@@ -46,11 +64,11 @@
]
},
"locked": {
"lastModified": 1777341401,
"narHash": "sha256-QEAVYeXxvTamsYJVBq8+qSJV9ml2MxqRaZvkobfuPWA=",
"lastModified": 1781053488,
"narHash": "sha256-P4WEBaKgl8flRckHxXGHzT0potPvB3x8ZFIp9gLEAMY=",
"ref": "refs/heads/master",
"rev": "0baa81aa03559ca315668e5a306364cddf1a6f49",
"revCount": 812,
"rev": "d99d87d5e5ec4e696815348692fdaaf0b6be1b2c",
"revCount": 822,
"type": "git",
"url": "https://git.outfoxxed.me/outfoxxed/quickshell"
},
+4
View File
@@ -1,8 +1,10 @@
from caelestia.parser import parse_args
from caelestia.utils.io import log
from caelestia.utils.version import print_version
def main() -> None:
try:
parser, args = parse_args()
if args.version:
print_version()
@@ -10,3 +12,5 @@ def main() -> None:
args.cls(args).run()
else:
parser.print_help()
except KeyboardInterrupt:
log("Exiting...")
+76 -1
View File
@@ -1,6 +1,23 @@
import argparse
import sys
from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper
from caelestia.subcommands import (
clipboard,
emoji,
install,
record,
resizer,
scheme,
screenshot,
shell,
toggle,
update,
wallpaper,
)
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import AUR_HELPERS
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import warn
from caelestia.utils.paths import wallpapers_dir
from caelestia.utils.scheme import get_scheme_names, scheme_variants
from caelestia.utils.wallpaper import get_wallpaper
@@ -128,4 +145,62 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
resizer_parser.add_argument("height", nargs="?", help="height to resize to")
resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)")
# Create parser for install opts
install_parser = command_parser.add_parser(
"install",
help="install the Caelestia dotfiles",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
install_parser.set_defaults(cls=install.Command)
install_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
install_parser.add_argument(
"--enable-components", metavar="LIST", help="comma-separated list of components to enable"
)
install_parser.add_argument(
"--disable-components", metavar="LIST", help="comma-separated list of components to disable"
)
install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
_set_install_epilog(install_parser)
# Create parser for update opts
update_parser = command_parser.add_parser("update", help="update the Caelestia dotfiles")
update_parser.set_defaults(cls=update.Command)
update_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
update_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
return parser, parser.parse_args()
def _set_install_epilog(install_parser: argparse.ArgumentParser) -> None:
"""Add components if using install subcommand"""
if len(sys.argv) > 1 and sys.argv[1] == "install":
manifest = _load_install_manifest()
if manifest is not None and manifest.components:
install_parser.epilog = _components_epilog(manifest)
def _load_install_manifest() -> Manifest | None:
source = DotsSource()
try:
source.ensure()
return source.manifest_at(source.remote_ref)
except Exception as e:
warn(f"failed to load manifest from dots repo ({e})\n", prefix=False)
return None
def _components_epilog(manifest: Manifest) -> str:
def e(*v: int) -> str:
return f"\033[{';'.join(str(c) for c in v)}m"
def b(c: int) -> str:
return e(1, c)
reset = e(0)
width = max(len(name) for name in manifest.components)
lines = [f"{b(34)}available components (for --enable-components / --disable-components):{reset}"]
for name, comp in manifest.components.items():
lines.append(f" {b(32)}{name:<{width}}{reset}\t{'(default)' if comp.default else '(off)'}")
return "\n".join(lines)
+266
View File
@@ -0,0 +1,266 @@
import shutil
import textwrap
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.legacy import (
LEGACY_META_PKG,
detect_legacy_repo,
legacy_config_symlinks,
legacy_symlinks,
legacy_to_delete,
)
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageError, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import confirm, disable_input, fatal, info, log, pause, prompt_selection, warn
from caelestia.utils.paths import (
config_backup_dir,
config_dir,
)
def _parse_list_arg(value: str | None) -> list[str] | None:
if value is None:
return None
return [item.strip() for item in value.split(",") if item.strip()]
def _deref_symlink(link: Path, target: Path) -> None:
"""Replace symlink `link` with a real copy of `target`'s content."""
bak = link.rename(link.parent / f"{link.name}.bak")
try:
if target.is_dir():
shutil.copytree(target, link, symlinks=True)
else:
shutil.copy2(target, link)
except OSError:
bak.rename(link)
raise
bak.unlink()
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
self.print_greeting()
self.create_backup()
legacy_dir = detect_legacy_repo() # Detect legacy repo first cause deploy overwrites legacy syms
source, tip, manifest = self.fetch_manifest()
try:
installer, packages, local_packages = self.install_packages(source, manifest)
except PackageError as e:
fatal(e)
run_hooks(manifest, "post_package")
self.dereference_legacy(legacy_dir) # Copy legacy content into place before deploy overwrites the symlinks
deployed = self.deploy_configs(source, manifest)
run_hooks(manifest, "post_install")
DotsState(
aur_helper=getattr(installer, "helper", DEFAULT_AUR_HELPER),
applied_rev=tip,
enabled_components=manifest.enabled_components,
packages=packages,
local_packages=local_packages,
deployed_files=deployed,
).save()
self.migrate_legacy(installer, legacy_dir)
self.print_done()
def print_greeting(self) -> None:
print(
"\033[38;2;150;241;241m" # Caelestia colour
+ textwrap.dedent(
r"""
╭─────────────────────────────────────────────────╮
│ ______ __ __ _ │
│ / ____/___ ____ / /__ _____/ /_(_)___ _ │
│ / / / __ `/ _ \/ / _ \/ ___/ __/ / __ `/ │
│ / /___/ /_/ / __/ / __(__ ) /_/ / /_/ / │
\____/\__,_/\___/_/\___/____/\__/_/\__,_/ │
│ │
╰─────────────────────────────────────────────────╯
"""
)
+ "\033[0m"
)
info("Welcome to the Caelestia dotfiles installer!")
info("Here's a quick overview on what this command is going to do:")
info(" - Install dependencies")
info(" - Install config files")
info("The installer does NOT set up hardware/system level configs (e.g. drivers). Please do this yourself.")
pause()
print()
def create_backup(self) -> None:
if config_dir.exists():
if not confirm("Back up the config directory?", default=True):
return
log(f"Creating a backup of {config_dir}...")
if config_backup_dir.exists():
if not confirm("A backup already exists, overwrite?", default=False):
info("Not creating backup.")
return
log("Deleting old backup...")
shutil.rmtree(config_backup_dir)
shutil.copytree(config_dir, config_backup_dir, symlinks=True)
info(f"Created backup at {config_backup_dir}")
def fetch_manifest(self) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.checkout_tip()
except SourceError as e:
fatal(e)
enable = _parse_list_arg(self.args.enable_components)
disable = _parse_list_arg(self.args.disable_components)
try:
manifest = source.manifest_at(tip)
# No flags given, prompt user for non-default components
if enable is None and disable is None:
optional = [name for name, comp in manifest.components.items() if not comp.default]
if optional:
enable = prompt_selection(optional, "Components to enable?")
manifest.resolve_components(enable=enable, disable=disable)
except (SourceError, ManifestError, ComponentError) as e:
fatal(e)
names = ", ".join(manifest.enabled_components) or "none"
info(f"Enabled components: {names}")
return source, tip, manifest
def deploy_configs(self, source: DotsSource, manifest: Manifest) -> dict[str, str]:
print()
log("Installing configs...")
deployer = Deployer()
for entry in manifest.enabled_entries():
src = source.working_path(entry.expanded_src())
if not src.exists():
warn(f"missing in source, skipping: {entry.src}")
continue
dests = entry.expanded_dests()
if not dests:
warn(f"dest glob matched nothing, skipping: {entry.dest}")
continue
for dest in dests:
deployer.place(src, Path(dest))
info(f"{entry.src} -> {dest}")
return deployer.deployed_files
def install_packages(
self, source: DotsSource, manifest: Manifest
) -> tuple[PackageInstaller, list[str], dict[str, list[str]]]:
installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm)
packages = manifest.enabled_packages()
if packages:
print()
log("Installing packages...")
installer.install(packages)
local_packages = {}
local_dirs = manifest.enabled_local_packages()
if local_dirs:
print()
log("Building local packages...")
local_packages = build_local_packages(installer, source, local_dirs)
return installer, packages, local_packages
def dereference_legacy(self, legacy_dir: Path | None) -> None:
"""Replace legacy symlinks with real copies of their targets."""
symlinks = legacy_symlinks(legacy_dir)
if not symlinks:
return
print()
log("Preserving content from legacy symlinks...")
for path in symlinks:
target = path.resolve()
if not target.exists():
continue
try:
_deref_symlink(path, target)
info(f"Copied {target} -> {path}")
except OSError as e:
warn(f"failed to preserve {path}: {e}")
def deref_backup_syms(self, legacy_dir: Path | None) -> None:
"""Deref the backup's legacy symlinks before the repo is cleared, so the backup keeps real content."""
if not config_backup_dir.is_dir():
return
for link in legacy_config_symlinks(config_backup_dir, legacy_dir):
target = link.resolve()
if not target.exists():
continue
try:
_deref_symlink(link, target)
except OSError as e:
warn(f"failed to preserve {link} in backup: {e}")
def migrate_legacy(self, installer: PackageInstaller, legacy_dir: Path | None) -> None:
"""Clean up a previous install.fish setup (repo, symlinks and metapackage)."""
to_delete = legacy_to_delete(legacy_dir)
meta_installed = installer.is_installed(LEGACY_META_PKG)
if not to_delete and not meta_installed:
return
print()
log("Found a legacy Caelestia installation...")
if not confirm("Clear legacy installation?"):
return
deployer = Deployer()
try:
self.deref_backup_syms(legacy_dir)
for path in to_delete:
deployer.remove(path)
info(f"Deleted {path}")
if meta_installed:
log("Removing legacy meta package...")
installer.remove([LEGACY_META_PKG])
except (OSError, PackageError) as e:
warn(f"could not fully clear the legacy installation: {e}")
def print_done(self) -> None:
print()
info("All done! Caelestia has been installed.")
info("A few things to finish up:")
info(" - A reboot is recommended for all changes take effect")
info(" - Edit `~/.config/caelestia/hypr-vars.conf` to set default apps, keybinds and much more")
info(" - Edit `~/.config/caelestia/hypr-user.conf` to set your monitor layout and other Hyprland configs")
info(" - Run `caelestia update` later to pull in the latest changes")
info("Enjoy! For support (or to just hang out), join our Discord server: https://discord.gg/BGDCFCmMBk")
+4 -7
View File
@@ -1,4 +1,3 @@
import json
import re
import shutil
import subprocess
@@ -9,7 +8,7 @@ from pathlib import Path
from caelestia.utils import hypr
from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir, user_config_path
from caelestia.utils.paths import get_config, recording_notif_path, recording_path, recordings_dir
RECORDER = "gpu-screen-recorder"
@@ -65,12 +64,10 @@ class Command:
if self.args.sound:
args += ["-a", "default_output"]
config = get_config()
try:
config = json.loads(user_config_path.read_text())
if "record" in config and "extraArgs" in config["record"]:
args += config["record"]["extraArgs"]
except (json.JSONDecodeError, FileNotFoundError):
pass
except TypeError as e:
raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}")
@@ -123,7 +120,7 @@ class Command:
)
if action == "watch":
subprocess.Popen(["app2unit", "-O", new_path], start_new_session=True)
subprocess.Popen(["xdg-open", new_path], start_new_session=True)
elif action == "open":
p = subprocess.run(
[
@@ -138,6 +135,6 @@ class Command:
]
)
if p.returncode != 0:
subprocess.Popen(["app2unit", "-O", new_path.parent], start_new_session=True)
subprocess.Popen(["xdg-open", new_path.parent], start_new_session=True)
elif action == "delete":
new_path.unlink()
+71 -55
View File
@@ -7,8 +7,8 @@ from pathlib import Path
from typing import Any, Dict, Optional
from caelestia.utils import hypr
from caelestia.utils.logging import log_message
from caelestia.utils.paths import user_config_path
from caelestia.utils.io import error, fatal, info, log, warn
from caelestia.utils.paths import get_config
class WindowRule:
@@ -26,14 +26,34 @@ class Command:
self.timeout_tracker: dict[str, float] = {}
self.window_rules = self._load_window_rules()
def _make_resize_cmd(self, width: int | str, height: int | str, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.resize({{x = {width}, y = {height}, exact = true, window = "address:{address}"}})'
return f"dispatch resizewindowpixel exact {width} {height},address:{address}"
def _make_move_cmd(self, x: int, y: int, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.move({{x = {x}, y = {y}, window = "address:{address}"}})'
return f"dispatch movewindowpixel exact {x} {y},address:{address}"
def _make_float_cmd(self, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.float({{action = "toggle", window = "address:{address}"}})'
return f"dispatch togglefloating address:{address}"
def _make_center_cmd(self) -> str:
if hypr.is_lua_config():
return "dispatch hl.dsp.window.center()"
return "dispatch centerwindow"
def _load_window_rules(self) -> list[WindowRule]:
default_rules = [
WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]),
WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]),
]
config = get_config()
try:
config = json.loads(user_config_path.read_text())
if "resizer" in config and "rules" in config["resizer"]:
rules = []
for rule_config in config["resizer"]["rules"]:
@@ -47,8 +67,8 @@ class Command:
)
)
return rules
except (json.JSONDecodeError, KeyError):
log_message("ERROR: invalid config")
except KeyError:
warn("invalid config, falling back to default rules")
except FileNotFoundError:
pass
@@ -164,16 +184,14 @@ class Command:
move_x = monitor_x + monitor_width - scaled_width - offset
move_y = monitor_y + monitor_height - scaled_height - offset
command1 = f"dispatch resizewindowpixel exact {scaled_width} {scaled_height},address:{address}"
command2 = f"dispatch movewindowpixel exact {int(move_x)} {int(move_y)},address:{address}"
command1 = self._make_resize_cmd(scaled_width, scaled_height, address)
command2 = self._make_move_cmd(int(move_x), int(move_y), address)
hypr.batch(command1, command2)
log_message(
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
)
info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})")
except Exception as e:
log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}")
error(f"failed to apply PiP action to window 0x{window_id}: {e}")
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
dispatch_commands = []
@@ -181,23 +199,23 @@ class Command:
if "float" in actions:
window_info = self._get_window_info(window_id)
if window_info and not window_info.get("floating", False):
dispatch_commands.append(f"dispatch togglefloating address:0x{window_id}")
dispatch_commands.append(self._make_float_cmd(f"0x{window_id}"))
if "pip" in actions:
self._apply_pip_action(window_id)
return True
dispatch_commands.append(f"dispatch resizewindowpixel exact {width} {height},address:0x{window_id}")
dispatch_commands.append(self._make_resize_cmd(width, height, f"0x{window_id}"))
if "center" in actions:
dispatch_commands.append("dispatch centerwindow")
dispatch_commands.append(self._make_center_cmd())
try:
hypr.batch(*dispatch_commands)
log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
return True
except Exception as e:
log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}")
error(f"failed to apply window actions for window 0x{window_id}: {e}")
return False
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
@@ -216,7 +234,7 @@ class Command:
if re.search(rule.name, window_title):
return rule
except re.error:
log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'")
warn(f"invalid regex pattern in rule '{rule.name}'")
return None
@@ -238,7 +256,7 @@ class Command:
window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}")
warn(f"invalid window ID format: {window_id}")
return
window_info = self._get_window_info(window_id)
@@ -248,19 +266,19 @@ class Command:
window_title = window_info.get("title", "")
initial_title = window_info.get("initialTitle", "")
log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
rule = self._match_window_rule(window_title, initial_title)
if rule:
if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}")
log(f"Rate limited: skipping window 0x{window_id}")
return
log_message(f"Matched rule '{rule.name}' for window 0x{window_id}")
info(f"Matched rule '{rule.name}' for window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window title event: {e}")
warn(f"failed to parse window title event: {e}")
def _handle_open_event(self, event: str) -> None:
try:
@@ -276,22 +294,22 @@ class Command:
window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}")
warn(f"invalid window ID format: {window_id}")
return
log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
rule = self._match_window_rule(title, title)
if rule:
if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}")
log(f"Rate limited: skipping window 0x{window_id}")
return
log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}")
info(f"Matched rule '{rule.name}' for new window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window open event: {e}")
warn(f"failed to parse window open event: {e}")
def run(self) -> None:
if self.args.daemon:
@@ -304,7 +322,7 @@ class Command:
):
self._run_active_mode()
else:
print(
info(
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
)
@@ -313,28 +331,27 @@ class Command:
try:
active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found")
error("no active window found")
return
address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address")
error("invalid window address")
return
window_id = address[2:] # Remove "0x" prefix
window_title = active_window_result.get("title", "")
if not active_window_result.get("floating", False):
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.")
print("Try making it floating first with: hyprctl dispatch togglefloating")
warn(f"window '{window_title}' is not floating; PiP only works on floating windows.")
return
print(f"Applying PIP to active window: '{window_title}'")
info(f"Applying PiP to active window: '{window_title}'")
self._apply_pip_action(window_id)
print("PIP applied successfully")
info("PiP applied successfully")
except Exception as e:
print(f"ERROR: Failed to apply PIP to active window: {e}")
error(f"failed to apply PiP to active window: {e}")
def _run_active_mode(self) -> None:
try:
@@ -351,10 +368,10 @@ class Command:
matching_windows = self._find_matching_windows(temp_rule)
if not matching_windows:
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
return
print(f"Found {len(matching_windows)} matching window(s)")
info(f"Found {len(matching_windows)} matching window(s)")
# Apply rule to all matching windows
success_count = 0
@@ -362,41 +379,41 @@ class Command:
window_id = window["address"][2:] # Remove "0x" prefix
window_title = window.get("title", "")
print(f"Applying rule to window 0x{window_id}: '{window_title}'")
info(f"Applying rule to window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success:
success_count += 1
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
except Exception as e:
print(f"ERROR: Failed to apply rule: {e}")
error(f"failed to apply rule: {e}")
def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
"""Apply rule only to the currently active window"""
try:
active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found")
error("no active window found")
return
window_title = active_window_result.get("title", "")
address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address")
error("invalid window address")
return
window_id = address[2:] # Remove "0x" prefix
print(f"Applying rule to active window 0x{window_id}: '{window_title}'")
info(f"Applying rule to active window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success:
print("Rule applied successfully")
info("Rule applied successfully")
else:
print("Failed to apply rule")
error("failed to apply rule")
except Exception as e:
print(f"ERROR: Failed to apply rule to active window: {e}")
error(f"failed to apply rule to active window: {e}")
def _find_matching_windows(self, temp_rule: WindowRule) -> list:
"""Find all windows that match the given rule pattern"""
@@ -425,7 +442,7 @@ class Command:
try:
matches = bool(re.search(temp_rule.name, window_title))
except re.error:
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'")
warn(f"invalid regex pattern '{temp_rule.name}'")
return []
if matches:
@@ -434,23 +451,22 @@ class Command:
return matching_windows
except Exception as e:
print(f"ERROR: Failed to find matching windows: {e}")
error(f"failed to find matching windows: {e}")
return []
def _run_daemon(self) -> None:
log_message("Hyprland window resizer started")
log_message(f"Loaded {len(self.window_rules)} window rules")
info("Hyprland window resizer started")
info(f"Loaded {len(self.window_rules)} window rules")
socket_path = Path(hypr.socket2_path)
if not socket_path.exists():
log_message(f"ERROR: Hyprland socket not found at {socket_path}")
return
fatal(f"Hyprland socket not found at {socket_path}")
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path)
log_message("Connected to Hyprland socket, listening for events...")
info("Connected to Hyprland socket, listening for events...")
while True:
data = sock.recv(4096).decode()
@@ -460,6 +476,6 @@ class Command:
self._handle_window_event(line)
except KeyboardInterrupt:
log_message("Resizer daemon stopped")
info("Resizer daemon stopped")
except Exception as e:
log_message(f"ERROR: {e}")
error(str(e))
+7 -1
View File
@@ -2,6 +2,7 @@ import subprocess
from argparse import Namespace
from datetime import datetime
from caelestia.utils import hypr
from caelestia.utils.notify import notify
from caelestia.utils.paths import screenshots_cache_dir, screenshots_dir
@@ -33,7 +34,12 @@ class Command:
swappy.stdin.close()
def fullscreen(self) -> None:
sc_data = subprocess.check_output(["grim", "-"])
cmd = ["grim"]
focused_monitor = next(monitor for monitor in hypr.message("monitors") if monitor["focused"])
if focused_monitor:
cmd += ["-o", focused_monitor["name"]]
cmd += ["-"]
sc_data = subprocess.check_output(cmd)
subprocess.run(["wl-copy"], input=sc_data)
+4 -4
View File
@@ -6,7 +6,7 @@ from collections import ChainMap
from typing import Any, Callable, cast
from caelestia.utils import hypr
from caelestia.utils.paths import user_config_path
from caelestia.utils.paths import get_config
def is_subset(superset, subset):
@@ -103,8 +103,8 @@ class Command:
},
}
try:
self.cfg = DeepChainMap(json.loads(user_config_path.read_text())["toggles"], self.cfg)
except (FileNotFoundError, json.JSONDecodeError, KeyError):
self.cfg = DeepChainMap(get_config()["toggles"], self.cfg)
except KeyError:
pass
def run(self) -> None:
@@ -135,7 +135,7 @@ class Command:
if (spawn[0].endswith(".desktop") or shutil.which(spawn[0])) and not any(
selector(client) for client in self.get_clients()
):
hypr.dispatch("exec", f"[workspace special:{self.args.workspace}] app2unit -- {shlex.join(spawn)}")
hypr.dispatch("exec", f"[workspace special:{self.args.workspace}] {shlex.join(spawn)}")
return True
else:
return False
+260
View File
@@ -0,0 +1,260 @@
import sys
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.diff import Changeset
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import PackageError, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import disable_input, fatal, info, log, prompt_selection, warn
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
state = DotsState.load()
if state.applied_rev is None:
fatal("dots not installed yet. Run `caelestia install` first.")
# Run system update
try:
installer = PackageInstaller.get(self.args.aur_helper or state.aur_helper, self.args.noconfirm)
installer.system_update()
except PackageError as e:
fatal(e)
# Get manifest or exit if up to date
source, tip, manifest = self.fetch_manifest(state, state.applied_rev)
# Apply file changes
entries = manifest.enabled_entries()
try:
changeset = Changeset.compute(source, state.applied_rev, tip, entries, state.deployed_files)
source.checkout_tip()
except SourceError as e:
fatal(e)
new_files, revived_files, placed = self.deploy_changeset(source, changeset)
# Persist file changes immediately so a later failure can't lose track of them
deployed = dict(state.deployed_files)
for dest in (*changeset.deletes, *changeset.stale, *changeset.untracked):
deployed.pop(str(dest), None)
for repofile, dest in changeset.remap:
deployed[str(dest)] = repofile
deployed.update(placed)
state.deployed_files = deployed
state.save()
# Install new/remove old packages
desired = manifest.enabled_packages()
desired_local = manifest.enabled_local_packages()
try:
state.packages = self.sync_packages(installer, state.packages, desired)
state.save()
state.local_packages = self.sync_local_packages(installer, source, state.local_packages, desired_local)
state.save()
except PackageError as e:
fatal(e)
# Run hooks
run_hooks(manifest, "post_update")
# Mark the new revision applied
state.applied_rev = tip
state.enabled_components = manifest.enabled_components
state.aur_helper = getattr(installer, "helper", state.aur_helper)
state.save()
self.summarize(changeset, new_files, revived_files)
def fetch_manifest(self, state: DotsState, applied_rev: str) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.tip_rev()
if tip == applied_rev:
info("Dots already up to date.")
sys.exit(0)
manifest = source.manifest_at(tip)
if source.has_rev(applied_rev):
known = set(source.manifest_at(applied_rev).components)
else:
# Treat all components as known if rev is invalid so we don't overwrite existing prefs
known = set(manifest.components)
except (SourceError, ManifestError) as e:
fatal(e)
# Enable components recorded at install time + any new components that are default on
enabled = [
name
for name, comp in manifest.components.items()
if name in state.enabled_components or (name not in known and comp.default)
]
# Let the user opt into any new optional components
new_comps = [name for name, comp in manifest.components.items() if name not in known and not comp.default]
if new_comps:
info(f"New components: {', '.join(new_comps)}")
enabled += prompt_selection(new_comps, "Components to enable?")
disabled = [name for name in manifest.components if name not in enabled]
try:
manifest.resolve_components(enable=enabled, disable=disabled)
except ComponentError as e:
fatal(e)
info(f"Enabled components: {', '.join(enabled) or 'none'}")
return source, tip, manifest
def deploy_changeset(
self, source: DotsSource, changeset: Changeset
) -> tuple[list[Path], list[Path], dict[str, str]]:
print()
if changeset.is_empty():
info("No configs to update.")
return [], [], {}
log("Updating configs...")
deployer = Deployer()
for repofile, dest in changeset.place:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
deployer.place_file(src, dest)
info(f"{repofile} -> {dest}")
new_files = []
for repofile, dest in changeset.conflicts:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
new_files.append(new_path)
warn(f"{dest} has local changes; upstream version written as {new_path.name}")
revived_files = []
for repofile, dest in changeset.deleted_changed:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
revived_files.append(new_path)
warn(f"{dest} was removed but changed upstream; upstream version written as {new_path.name}")
for dest in changeset.deletes:
deployer.remove(dest)
deployer.prune_empty_dirs(dest, Path.home())
info(f"Removed {dest}")
return new_files, revived_files, deployer.deployed_files
def sync_packages(self, installer: PackageInstaller, current: list[str], desired: list[str]) -> list[str]:
to_install = [p for p in desired if p not in current]
to_remove = [p for p in current if p not in desired]
installed = list(current)
if to_install:
print()
info(f"Installing new packages: {', '.join(to_install)}")
installer.install(to_install)
installed.extend(p for p in to_install if p not in installed)
if to_remove:
print()
info(f"Packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Packages to remove?")
if selected:
installer.remove(selected)
installed = [p for p in installed if p not in selected]
return installed
def sync_local_packages(
self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str]
) -> dict[str, list[str]]:
to_build = [p for p in desired if p not in current]
to_rebuild = self.outdated_local_packages(installer, source, current, desired)
to_remove = [p for p in current if p not in desired]
installed = dict(current)
if to_build:
print()
log(f"Building new local packages: {', '.join(to_build)}")
installed.update(build_local_packages(installer, source, to_build))
if to_rebuild:
print()
log(f"Rebuilding updated local packages: {', '.join(to_rebuild)}")
installed.update(build_local_packages(installer, source, to_rebuild))
if to_remove:
print()
info(f"Local packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Local packages to remove?")
if selected:
installer.remove([pkg for path in selected for pkg in current[path]])
for path in selected:
installed.pop(path, None)
return installed
def outdated_local_packages(
self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str]
) -> list[str]:
"""Repo paths whose installed packages are older than what the repo would build (skipped when off Arch)."""
outdated = []
for path in desired:
if path not in current:
continue
directory = source.working_path(path)
if not directory.is_dir():
continue
try:
if installer.needs_rebuild(directory, current[path]):
outdated.append(path)
except PackageError as e:
# Failed to read PKGBUILD, leave it as-is
warn(f"could not check {path} for updates, leaving as-is: {e}")
return outdated
def summarize(self, changeset: Changeset, new_files: list[Path], revived_files: list[Path]) -> None:
print()
conflicts = len(new_files) + len(revived_files)
info(f"Updated {len(changeset.place)} file(s), removed {len(changeset.deletes)}, {conflicts} conflict(s).")
if new_files:
info("The following files were changed upstream but you had edited them locally.")
info("Your versions were kept; the upstream versions were written alongside as .new:")
for path in new_files:
info(f" {path}")
if revived_files:
info("These files were removed by you but changed upstream, so were not restored.")
info("The upstream versions were written alongside as .new:")
for path in revived_files:
info(f" {path}")
if changeset.stale:
info("These files are no longer managed but differ from what was installed, so were kept:")
for path in changeset.stale:
info(f" {path}")
+84
View File
@@ -0,0 +1,84 @@
import shutil
import tempfile
from pathlib import Path
from caelestia.utils.paths import cache_dir, config_dir, data_dir, dots_dir, state_dir
# Dirs to never prune even if empty
_PROTECTED_DIRS = frozenset({Path.home(), config_dir, data_dir, state_dir, cache_dir})
class Deployer:
"""Places files from the dots clone into their destinations."""
def __init__(self):
self.deployed_files: dict[str, str] = {}
def place(self, src: Path, dest: Path) -> None:
"""Place a whole entry (file or directory tree), replacing any existing dest."""
if src.is_dir():
self.place_dir(src, dest)
else:
self.place_file(src, dest)
def place_dir(self, src: Path, dest: Path) -> None:
"""Place a directory tree recursively, overwriting any existing dest files."""
if dest.is_symlink() or dest.is_file():
self.remove(dest)
dest.mkdir(parents=True, exist_ok=True)
for path in src.rglob("*"):
if path.is_file():
self.place_file(path, dest / path.relative_to(src))
elif path.is_dir():
(dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True)
def place_file(self, src: Path, dest: Path, record: bool = True) -> None:
"""Atomically place a single file, replacing any existing dest."""
if dest.is_dir() and not dest.is_symlink():
self.remove(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
f = tempfile.NamedTemporaryFile(dir=dest.parent, delete=False)
f.close()
try:
shutil.copyfile(src, f.name)
shutil.copymode(src, f.name)
Path(f.name).replace(dest)
except BaseException:
Path(f.name).unlink()
raise
if record:
# Keep relative to dots dir
self.deployed_files[str(dest)] = str(src.relative_to(dots_dir))
def write_new(self, src: Path, dest: Path) -> Path:
"""Write the upstream version alongside dest as <dest>.new and return that path."""
new_path = dest.parent / f"{dest.name}.new"
self.place_file(src, new_path, record=False)
return new_path
def remove(self, path: Path) -> None:
if path.is_symlink() or path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
def prune_empty_dirs(self, start: Path, stop: Path) -> None:
"""Removes dirs recursively from start to stop.
Will never prune protected dirs (home, config, cache, etc).
"""
parent = start.parent
while parent != stop and stop in parent.parents and parent not in _PROTECTED_DIRS:
try:
parent.rmdir()
except OSError:
break
parent = parent.parent
+153
View File
@@ -0,0 +1,153 @@
from dataclasses import dataclass, field
from pathlib import Path
from caelestia.utils.dots.manifest import ManifestEntry
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.io import warn
class _Continue(Exception):
"""Signals the deployed-files loop to skip to the next entry."""
def _read_local(path: Path) -> bytes | None:
"""Read a local file, returning None if it can't be read (perms, is a dir, etc.)."""
try:
return path.read_bytes()
except OSError:
return None
@dataclass(frozen=True)
class Changeset:
place: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) to fast-forward
conflicts: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) -> write .new
deletes: list[Path] = field(default_factory=list) # We placed it, upstream removed it, unmodified
stale: list[Path] = field(default_factory=list) # Upstream removed it but user modified it
deleted_changed: list[tuple[str, Path]] = field(default_factory=list) # User deleted it, upstream changed -> .new
untracked: list[Path] = field(default_factory=list) # Gone + no longer managed; drop from state
remap: list[tuple[str, Path]] = field(default_factory=list) # Up to date but source path moved; restate mapping
def is_empty(self) -> bool:
return not (self.place or self.conflicts or self.deletes or self.stale or self.deleted_changed)
@staticmethod
def compute(
source: DotsSource,
applied_rev: str,
tip: str,
entries: list[ManifestEntry],
deployed: dict[str, str],
) -> "Changeset":
"""Collect all file changes needed into a Changeset."""
has_base = source.has_rev(applied_rev)
if not has_base:
warn(
"the previously applied revision is missing from the dots clone; files that differ "
"from the latest version will be written as .new instead of updated in place."
)
changed = set(source.changed_files(applied_rev, tip)) if has_base else set()
place: list[tuple[str, Path]] = []
conflicts: list[tuple[str, Path]] = []
deletes: list[Path] = []
stale: list[Path] = []
deleted_changed: list[tuple[str, Path]] = []
untracked: list[Path] = []
remap: list[tuple[str, Path]] = []
# Collect all files to deploy (entry sources can be dirs so we recurse into them)
to_deploy: dict[Path, str] = {}
for entry in entries:
src_root = str(entry.expanded_src())
repo_files = source.files_at(tip, src_root)
for dest in entry.expanded_dests():
for repo_file in repo_files:
to_deploy[dest / Path(repo_file).relative_to(src_root)] = repo_file
files_to_deploy = set(to_deploy)
# Already deployed files
for dest, src in deployed.items():
dest_path = Path(dest)
def try_read(rev: str, path: str) -> bytes:
try:
return source.blob_at(rev, path)
except SourceError:
# Read failed, keep it just in case
stale.append(dest_path)
raise _Continue
try:
if dest_path not in files_to_deploy: # No longer managed by any entry
if not dest_path.exists():
# Gone from disk and no entry manages it
untracked.append(dest_path)
continue
local = _read_local(dest_path)
if local is not None and has_base and try_read(applied_rev, src) == local:
deletes.append(dest_path)
else:
# Modified, or unreadable so we can't verify; keep it just in case
stale.append(dest_path)
else: # Still managed; `src` is what we last placed, `new_src` the current source
new_src = to_deploy[dest_path]
if not dest_path.exists():
# User deleted a managed file locally
if has_base and new_src == src and new_src not in changed:
continue # Respect the deletion; upstream has nothing new to offer
# Upstream changed it (or base is unknown): surface as .new, don't restore
deleted_changed.append((new_src, dest_path))
continue
if has_base and new_src == src and new_src not in changed:
continue # Unchanged upstream
dest_content = _read_local(dest_path)
if dest_content is None:
# Unreadable (perms, became a dir, ...); surface upstream as .new, don't clobber
conflicts.append((new_src, dest_path))
continue
if try_read(tip, new_src) == dest_content:
# Already up to date; restate the mapping if the source path moved
if new_src != src:
remap.append((new_src, dest_path))
continue
# Fast-forward only when the user hasn't edited since last deploy
if has_base and try_read(applied_rev, src) == dest_content:
place.append((new_src, dest_path))
else:
conflicts.append((new_src, dest_path))
except _Continue:
continue
# New files to deploy
for dest in files_to_deploy - set(Path(d) for d in deployed):
src = to_deploy[dest]
try:
new_content = source.blob_at(tip, src)
except SourceError:
# Failed to read the upstream blob; skip rather than abort the whole update
warn(f"could not read from source, skipping: {src}")
continue
if not dest.exists() or new_content == _read_local(dest):
# Dest nonexistent or already equal to new content
place.append((src, dest))
else:
# Differs, or exists but unreadable; surface upstream as .new
conflicts.append((src, dest))
return Changeset(
place=place,
conflicts=conflicts,
deletes=deletes,
stale=stale,
deleted_changed=deleted_changed,
untracked=untracked,
remap=remap,
)
+94
View File
@@ -0,0 +1,94 @@
import subprocess
from pathlib import Path
from caelestia.utils.paths import config_dir, data_dir
LEGACY_META_PKG = "caelestia-meta"
_confs = [
"hypr",
"starship.toml",
"foot",
"fish",
"fastfetch",
"uwsm",
"btop",
"spicetify",
"Code/User/settings.json",
"VSCodium/User/settings.json",
"Code/User/keybindings.json",
"VSCodium/User/keybindings.json",
"code-flags.conf",
"codium-flags.conf",
]
def _find_legacy_repo(path: Path) -> Path | None:
try:
remote = subprocess.check_output(["git", "-C", path, "remote", "get-url", "origin"], text=True)
except subprocess.CalledProcessError:
return
# Check remote
if remote.strip() != "https://github.com/caelestia-dots/caelestia.git":
return
# Ignore anything outside home
if Path.home() not in path.parents:
return
# Walk up parents (capped at home) to find the repo root
while path != Path.home() and not (path / ".git").is_dir():
path = path.parent
# Only return path if didn't hit home (we really don't want to nuke home)
if path != Path.home():
return path
def _filter_candidates(candidates: list[Path], legacy_dir: Path) -> list[Path]:
return [path for path in candidates if path.is_symlink() and legacy_dir in path.resolve().parents]
def detect_legacy_repo() -> Path | None:
for conf in _confs:
path = config_dir / conf
if not path.is_symlink():
continue
legacy_dir = _find_legacy_repo(path.resolve())
if legacy_dir:
return legacy_dir
return _find_legacy_repo(data_dir / "caelestia")
def legacy_config_symlinks(base: Path, legacy_dir: Path | None) -> list[Path]:
"""Config-relative links install.fish created, resolved under `base` (the live config or a backup of it)."""
if not legacy_dir:
return []
candidates = [base / conf for conf in _confs]
return _filter_candidates(candidates, legacy_dir)
def legacy_symlinks(legacy_dir: Path | None) -> list[Path]:
"""All paths symlinked into the legacy repo (the links install.fish created)."""
if not legacy_dir:
return []
extras = [
*(Path.home() / ".zen").glob("*/chrome/userChrome.css"),
Path.home() / ".local/lib/caelestia/caelestiafox",
]
return [*legacy_config_symlinks(config_dir, legacy_dir), *_filter_candidates(extras, legacy_dir)]
def legacy_to_delete(legacy_dir: Path | None) -> list[Path]:
if not legacy_dir:
return []
return [*legacy_symlinks(legacy_dir), legacy_dir]
+231
View File
@@ -0,0 +1,231 @@
import glob
import os
import re
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from string import Template
from typing import Any
from caelestia.utils.io import warn
_XDG_DEFAULTS = {
"XDG_CONFIG_HOME": str(Path.home() / ".config"),
"XDG_DATA_HOME": str(Path.home() / ".local/share"),
"XDG_STATE_HOME": str(Path.home() / ".local/state"),
"XDG_CACHE_HOME": str(Path.home() / ".cache"),
}
_GLOB_MAGIC = re.compile(r"[*?[]")
_LOCAL_PREFIX = "local:"
class ManifestError(Exception):
"""Raised when manifest.toml is malformed."""
class ComponentError(Exception):
"""Raised when component flags are invalid or contradictory."""
def _expand(text: str) -> Path:
"""Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path."""
env = {**_XDG_DEFAULTS, **os.environ}
return Path(Template(text).safe_substitute(env)).expanduser()
@dataclass(frozen=True)
class ManifestEntry:
src: str
dest: str
def expanded_src(self) -> Path:
return _expand(self.src)
def expanded_dests(self) -> list[Path]:
"""The dest path with globs expanded.
Globs from the start until the segment with the last glob so subdirs are
created if they didn't exist previously.
"""
expanded = _expand(self.dest)
if not _GLOB_MAGIC.search(str(expanded)):
return [expanded]
parts = expanded.parts
glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part))
pattern = str(Path(*parts[: glob_idx + 1]))
tail = parts[glob_idx + 1 :]
matches = sorted(glob.glob(pattern))
if tail: # Only match dirs if a tail exists
matches = [match for match in matches if Path(match).is_dir()]
return [Path(match, *tail) for match in matches]
@dataclass(frozen=True)
class ManifestComponent:
name: str
default: bool = False
packages: list[str] = field(default_factory=list)
entries: list[ManifestEntry] = field(default_factory=list)
post_package: list[str] = field(default_factory=list)
post_install: list[str] = field(default_factory=list)
post_update: list[str] = field(default_factory=list)
@dataclass
class _ManifestData:
enabled_comps: list[str] = field(default_factory=list)
disabled_comps: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class Manifest:
components: dict[str, ManifestComponent] = field(default_factory=dict)
packages: list[str] = field(default_factory=list)
post_package: list[str] = field(default_factory=list) # Post package install (install cmd only)
post_install: list[str] = field(default_factory=list) # Very end of install cmd
post_update: list[str] = field(default_factory=list) # Very end of update cmd
_data: _ManifestData = field(default_factory=_ManifestData, init=False, repr=False)
@property
def enabled_components(self) -> list[str]:
return self._data.enabled_comps
@property
def disabled_components(self) -> list[str]:
return self._data.disabled_comps
@staticmethod
def parse(text: str) -> "Manifest":
try:
raw = tomllib.loads(text)
except tomllib.TOMLDecodeError as e:
raise ManifestError(f"invalid TOML: {e}") from e
post_package = _validate_str_list(raw.get("post_package", []), "post_package")
post_install = _validate_str_list(raw.get("post_install", []), "post_install")
post_update = _validate_str_list(raw.get("post_update", []), "post_update")
packages = _validate_str_list(raw.get("packages", []), "packages")
components = {}
for comp in raw.get("components", []):
parsed = _parse_component(comp)
if parsed.name in components:
warn(f"duplicate component '{parsed.name}'; using the last definition")
components[parsed.name] = parsed
return Manifest(
components=components,
packages=packages,
post_package=post_package,
post_install=post_install,
post_update=post_update,
)
def resolve_components(
self,
enable: list[str] | None = None,
disable: list[str] | None = None,
) -> None:
"""Resolves enabled/disabled components. This MUST be called before calling any other method."""
enable_set = set(enable or [])
disable_set = set(disable or [])
known = set(self.components)
for name in enable_set | disable_set:
if name not in known:
raise ComponentError(f"unknown component: {name}")
conflict = enable_set & disable_set
if conflict:
raise ComponentError(f"component(s) both enabled and disabled: {', '.join(sorted(conflict))}")
enabled = {name for name, comp in self.components.items() if comp.default}
enabled |= enable_set
enabled -= disable_set
self._data.enabled_comps.clear()
self._data.disabled_comps.clear()
for name in self.components:
if name in enabled:
self._data.enabled_comps.append(name)
else:
self._data.disabled_comps.append(name)
def enabled_entries(self) -> list[ManifestEntry]:
"""The entries of every enabled component."""
entries: list[ManifestEntry] = []
for name in self._data.enabled_comps:
entries.extend(self.components[name].entries)
return entries
def enabled_hooks(self, kind: str) -> list[str]:
"""Global + enabled components' hooks of the given kind."""
hooks = list(getattr(self, kind))
for name in self._data.enabled_comps:
hooks.extend(getattr(self.components[name], kind))
return hooks
def enabled_packages(self) -> list[str]:
"""Repo/AUR packages to install."""
return [p for p in self._all_packages() if not p.startswith(_LOCAL_PREFIX)]
def enabled_local_packages(self) -> list[str]:
"""Local PKGBUILD dirs to build.
Local packages are determined by a local: prefix and are
relative dirs instead of package names.
"""
return [p[len(_LOCAL_PREFIX) :] for p in self._all_packages() if p.startswith(_LOCAL_PREFIX)]
def _all_packages(self) -> list[str]:
"""The manifest's top-level packages plus enabled components', in manifest order.
Top-level packages come first, then each enabled component's packages in
component order. Only the first occurrence of each package is kept.
"""
seen: set[str] = set()
ordered: list[str] = []
for pkg in (*self.packages, *(p for c in self._data.enabled_comps for p in self.components[c].packages)):
if pkg not in seen:
seen.add(pkg)
ordered.append(pkg)
return ordered
def _require_key(d: dict[str, Any], key: str, ctx: str) -> Any:
if key not in d:
raise ManifestError(f"{ctx}: missing required key '{key}'")
return d[key]
def _validate_str_list(value: Any, ctx: str) -> list[str]:
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
raise ManifestError(f"{ctx}: expected a list of strings")
return value
def _parse_entry(d: Any) -> ManifestEntry:
if not isinstance(d, dict):
raise ManifestError("entry: expected a table")
return ManifestEntry(src=_require_key(d, "src", "entry"), dest=_require_key(d, "dest", "entry"))
def _parse_component(d: dict[str, Any]) -> ManifestComponent:
name = _require_key(d, "name", "component")
return ManifestComponent(
name=name,
default=bool(d.get("default", False)),
packages=_validate_str_list(d.get("packages", []), f"component '{name}' packages"),
entries=[_parse_entry(e) for e in d.get("entries", [])],
post_package=_validate_str_list(d.get("post_package", []), f"component '{name}' post_package"),
post_install=_validate_str_list(d.get("post_install", []), f"component '{name}' post_install"),
post_update=_validate_str_list(d.get("post_update", []), f"component '{name}' post_update"),
)
+39
View File
@@ -0,0 +1,39 @@
import os
import subprocess
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import PackageInstaller
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import info, log, warn
from caelestia.utils.paths import dots_dir
def build_local_packages(installer: PackageInstaller, source: DotsSource, paths: list[str]) -> dict[str, list[str]]:
"""Build and install each local PKGBUILD dir, returning {path: installed package names}."""
built: dict[str, list[str]] = {}
for path in paths:
directory = source.working_path(path)
if not directory.is_dir():
warn(f"missing in repo, skipping: {path}")
continue
log(f"Building {path}...")
built[path] = installer.build_install(directory)
return built
def run_hooks(manifest: Manifest, kind: str) -> None:
"""Run the global + enabled components' hooks of the given kind (e.g. "post_install")."""
hooks = manifest.enabled_hooks(kind)
if not hooks:
return
print()
log(f"Running {kind.replace('_', '-')} hooks...")
env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)}
for hook in hooks:
info(f"Running hook: {hook}")
result = subprocess.run(hook, shell=True, env=env)
if result.returncode != 0:
warn(f"hook exited with {result.returncode}")
+260
View File
@@ -0,0 +1,260 @@
import os
import shutil
import subprocess
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from caelestia.utils.io import fatal, info, warn
DEFAULT_AUR_HELPER = "paru"
AUR_HELPERS = DEFAULT_AUR_HELPER, "yay"
class PackageError(Exception):
"""Raised when a package operation (install/remove/build/update) fails."""
def _try_run(cmd: list[str], error_msg: str, **kwargs) -> None:
"""Run a subprocess, raising `PackageError` if it fails."""
try:
subprocess.run(cmd, check=True, **kwargs)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PackageError(error_msg) from e
def _read_srcinfo(directory: Path) -> dict[str, list[str]]:
"""Run `makepkg --printsrcinfo` in `directory`, grouping each key to its list of values."""
try:
srcinfo = subprocess.check_output(["makepkg", "--printsrcinfo"], cwd=directory, text=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PackageError(f"failed to read package metadata in {directory}") from e
fields: dict[str, list[str]] = {}
for line in srcinfo.splitlines():
key, sep, value = line.partition("=")
if not sep:
continue
fields.setdefault(key.strip(), []).append(value.strip())
return fields
def _srcinfo_version(fields: dict[str, list[str]]) -> str | None:
"""Build the `[epoch:]pkgver-pkgrel` version string from parsed .SRCINFO fields, or None if absent."""
pkgver = next(iter(fields.get("pkgver", [])), None)
pkgrel = next(iter(fields.get("pkgrel", [])), None)
if pkgver is None or pkgrel is None:
return None
version = f"{pkgver}-{pkgrel}"
epoch = next(iter(fields.get("epoch", [])), None)
return f"{epoch}:{version}" if epoch else version
def _vercmp(a: str, b: str) -> int:
"""Use pacman's `vercmp` to compare to package versions."""
try:
return int(subprocess.check_output(["vercmp", a, b], text=True).strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
warn(f"vercmp failed, assuming equal: {e}")
return 0 # Don't rebuild when unable to check version
def _install_aur_helper(helper: str, noconfirm: bool = False) -> None:
pacman_cmd = ["sudo", "pacman", "-S", "--needed", "git", "base-devel"]
if noconfirm:
pacman_cmd.append("--noconfirm")
_try_run(pacman_cmd, "failed to install AUR helper build dependencies")
repo_url = f"https://aur.archlinux.org/{helper}.git"
with tempfile.TemporaryDirectory() as repo_dir:
_try_run(["git", "clone", repo_url, repo_dir], f"failed to clone {helper} from the AUR")
makepkg_cmd = ["makepkg", "-si"]
if noconfirm:
makepkg_cmd.append("--noconfirm")
_try_run(makepkg_cmd, f"failed to build and install {helper}", cwd=repo_dir)
try:
if helper == "yay":
subprocess.run(["yay", "-Y", "--gendb"], check=True)
subprocess.run(["yay", "-Y", "--devel", "--save"], check=True)
elif helper == "paru":
subprocess.run(["paru", "--gendb"], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
warn(f"failed to run AUR helper post install actions: {e}")
class PackageInstaller(ABC):
@staticmethod
def get(helper: str | None = None, noconfirm: bool = False) -> "PackageInstaller":
"""Pick a package installer: the requested/detected AUR helper on Arch, else a no-op."""
# Not on Arch, can't install packages
if shutil.which("pacman") is None:
return NoopInstaller()
# Explicitly given
if helper:
if not shutil.which(helper):
if helper not in AUR_HELPERS:
fatal(f"given AUR helper {helper} is not installed and is unable to be installed automatically.")
info(f"Given AUR helper not installed. Installing {helper}...")
_install_aur_helper(helper, noconfirm)
return ArchInstaller(helper, noconfirm)
# Not given, find installed one
for candidate in AUR_HELPERS:
if shutil.which(candidate):
return ArchInstaller(candidate, noconfirm)
info(f"No AUR helper found. Installing {DEFAULT_AUR_HELPER}...")
_install_aur_helper(DEFAULT_AUR_HELPER, noconfirm)
return ArchInstaller(DEFAULT_AUR_HELPER, noconfirm)
# --- Abstract methods ---
@abstractmethod
def install(self, packages: list[str]) -> None: ...
@abstractmethod
def remove(self, packages: list[str]) -> None: ...
@abstractmethod
def build_install(self, directory: Path) -> list[str]:
"""Build and install the PKGBUILD in `directory`, returning the installed package names."""
@abstractmethod
def installed_version(self, package: str) -> str | None:
"""Return the installed version of `package`, or None if it is not installed."""
def is_installed(self, package: str) -> bool:
return self.installed_version(package) is not None
@abstractmethod
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
"""Whether the PKGBUILD in `directory` would build a version differing from the installed `packages`."""
@abstractmethod
def system_update(self) -> None: ...
class NoopInstaller(PackageInstaller):
"""Used off Arch, where the dots' packages are not available via pacman/AUR."""
def install(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package install (not on Arch): {', '.join(packages)}")
def remove(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package removal (not on Arch): {', '.join(packages)}")
def build_install(self, directory: Path) -> list[str]:
info(f"Skipping local package build (not on Arch): {directory}")
return []
def installed_version(self, package: str) -> str | None:
return None
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
return False
def system_update(self) -> None:
info("Skipping system update (not on Arch)")
class ArchInstaller(PackageInstaller):
def __init__(self, helper: str, noconfirm: bool = False) -> None:
self.helper = helper
self.flags = ["--noconfirm"] if noconfirm else []
def install(self, packages: list[str], explicit: bool = True) -> None:
if not packages:
return
cmd = [self.helper, "-S", "--needed", *self.flags]
if not explicit:
cmd.append("--asdeps") # Set install reason to dep (does not affect already installed packages)
_try_run(cmd + packages, f"failed to install packages: {', '.join(packages)}")
# Force install reason to explicit install
if explicit:
# `-D` only accepts real installed names, so resolve any virtual/`provides` names (e.g. awk -> gawk)
resolved = [self._installed_name(pkg) for pkg in packages]
try:
subprocess.run([self.helper, "-D", "--asexplicit", *self.flags, *resolved], check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
warn(f"failed to mark packages as explicitly installed: {', '.join(resolved)}")
def remove(self, packages: list[str]) -> None:
if not packages:
return
_try_run([self.helper, "-Rns", *self.flags, *packages], f"failed to remove packages: {', '.join(packages)}")
def build_install(self, directory: Path) -> list[str]:
fields = _read_srcinfo(directory)
names = fields.get("pkgname", [])
depends = fields.get("depends", [])
self.install(depends, explicit=False)
# Stop makepkg from resetting sudo
env = {**os.environ, "PACMAN_AUTH": "sudo"}
# -f = force, -s = sync deps, -i = install
_try_run(
["makepkg", "-fsi", *self.flags], f"failed to build local package in {directory}", cwd=directory, env=env
)
# Clean build artifacts
for artifact in directory.glob("*.pkg.tar*"):
try:
artifact.unlink()
except OSError as e:
warn(f"failed to remove build artifact {artifact}: {e}")
return names
def _query(self, package: str) -> tuple[str, str] | None:
"""Return the installed (name, version) of `package`, resolving `provides` (e.g. awk -> gawk), or None."""
result = subprocess.run(
["pacman", "-Q", package],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
if result.returncode != 0:
return None
# `pacman -Q` resolves provides and prints "<real name> <version>"
parts = result.stdout.split()
return (parts[0], parts[1]) if len(parts) >= 2 else None
def _installed_name(self, package: str) -> str:
"""Resolve `package` to its real installed name (handles provides), falling back to the given name."""
query = self._query(package)
return query[0] if query else package
def installed_version(self, package: str) -> str | None:
query = self._query(package)
return query[1] if query else None
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
built = _srcinfo_version(_read_srcinfo(directory))
if built is None:
return False # Can't determine the source version, leave as is
# Rebuild when installed version < repo version
# Don't rebuild packages that have been removed
return any(
(installed := self.installed_version(pkg)) is not None and _vercmp(built, installed) > 0 for pkg in packages
)
def system_update(self) -> None:
_try_run([self.helper, "-Syu", *self.flags], "failed to perform system update")
+123
View File
@@ -0,0 +1,123 @@
import shutil
import subprocess
from pathlib import Path
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.paths import dots_dir, get_config
class SourceError(Exception):
"""Raised when a git operation against the dots clone fails."""
class DotsSource:
_fetched_source: bool = False
def __init__(self) -> None:
cfg = get_config().get("dots", {})
self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git")
self.branch = cfg.get("branch", "main")
# Cache git blobs by (ref, relpath); objects are immutable for a given rev
self._blob_cache: dict[tuple[str, str], bytes] = {}
@property
def remote_ref(self) -> str:
return f"origin/{self.branch}"
def exists(self) -> bool:
return (dots_dir / ".git").is_dir()
def working_path(self, relpath: str | Path) -> Path:
"""Get a Path relative to the dots dir."""
return dots_dir / relpath
def ensure(self) -> None:
"""Clone the repo if absent, otherwise fetch the latest refs.
If the configured url changed, the stale clone is removed and re-cloned
from the new source.
"""
if self.exists():
if self.current_url() == self.url:
if DotsSource._fetched_source:
return
self._git("fetch", "--prune", "origin", self.branch)
DotsSource._fetched_source = True
return
shutil.rmtree(dots_dir)
dots_dir.parent.mkdir(parents=True, exist_ok=True)
self._run("git", "clone", "--branch", self.branch, self.url, str(dots_dir))
DotsSource._fetched_source = True
def current_url(self) -> str:
return self._git("remote", "get-url", "origin").strip()
def checkout_tip(self) -> str:
"""Reset the working tree to the fetched tip and return its commit hash."""
self._git("reset", "--hard", self.remote_ref)
return self.tip_rev()
def tip_rev(self) -> str:
return self._git("rev-parse", self.remote_ref).strip()
def changed_files(self, base: str, head: str) -> list[str]:
"""Repo-relative paths that differ between two revisions."""
out = self._git("diff", "--name-only", base, head)
return [line for line in out.splitlines() if line]
def has_rev(self, rev: str) -> bool:
"""Whether `rev` resolves to a commit."""
try:
self._git("rev-parse", "--verify", "--quiet", f"{rev}^{{commit}}")
return True
except SourceError:
return False
def clean(self) -> None:
"""Remove all untracked files in the git repo."""
self._git("clean", "-fdx")
# --- Accessors ---
def manifest_at(self, ref: str) -> Manifest:
return Manifest.parse(self.text_at(ref, "manifest.toml"))
def text_at(self, ref: str, relpath: str) -> str:
return self._git("show", f"{ref}:{relpath}")
def blob_at(self, ref: str, relpath: str) -> bytes:
key = (ref, relpath)
if key not in self._blob_cache:
self._blob_cache[key] = self._git_bytes("show", f"{ref}:{relpath}")
return self._blob_cache[key]
def files_at(self, ref: str, relpath: str) -> list[str]:
"""Repo-relative paths of all files under relpath at ref (the path itself if it is a file)."""
out = self._git("ls-tree", "-r", "--name-only", ref, "--", relpath)
return [line for line in out.splitlines() if line]
# --- Helpers ---
def _git(self, *args: str) -> str:
# core.quotePath=false so non-ASCII paths come back verbatim, not octal-escaped
return self._run("git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args)
def _git_bytes(self, *args: str) -> bytes:
cmd = ["git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed")
return result.stdout
def _run(self, *cmd: str) -> str:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise SourceError(result.stderr.strip() or f"{' '.join(cmd)} failed")
return result.stdout
+58
View File
@@ -0,0 +1,58 @@
import json
from dataclasses import dataclass, field
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER
from caelestia.utils.io import warn
from caelestia.utils.paths import atomic_dump, dots_state_path
@dataclass
class DotsState:
# The AUR helper selected selected at install time
aur_helper: str = "paru"
# The git rev of currently applied dots version
applied_rev: str | None = None
# The currently enabled components
enabled_components: list[str] = field(default_factory=list)
# Previously installed packages/local packages
packages: list[str] = field(default_factory=list)
local_packages: dict[str, list[str]] = field(default_factory=dict)
# Files placed by the last deploy. Only files, not directories
# Maps dest -> src
deployed_files: dict[str, str] = field(default_factory=dict)
@staticmethod
def load() -> "DotsState":
try:
data = json.loads(dots_state_path.read_text())
except FileNotFoundError:
return DotsState()
except json.JSONDecodeError:
warn("failed to parse current dots state.")
return DotsState()
return DotsState(
aur_helper=data.get("aur_helper", DEFAULT_AUR_HELPER),
applied_rev=data.get("applied_rev"),
enabled_components=data.get("enabled_components", []),
packages=data.get("packages", []),
local_packages=data.get("local_packages", {}),
deployed_files=data.get("deployed_files", {}),
)
def save(self) -> None:
atomic_dump(
dots_state_path,
{
"aur_helper": self.aur_helper,
"applied_rev": self.applied_rev,
"enabled_components": self.enabled_components,
"packages": self.packages,
"local_packages": self.local_packages,
"deployed_files": self.deployed_files,
},
)
+30
View File
@@ -7,6 +7,7 @@ socket_base = f"{os.getenv('XDG_RUNTIME_DIR')}/hypr/{os.getenv('HYPRLAND_INSTANC
socket_path = f"{socket_base}/.socket.sock"
socket2_path = f"{socket_base}/.socket2.sock"
_lua_config_cache: bool | None = None
def message(msg: str, is_json: bool = True) -> str | dict[str, Any]:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
@@ -26,7 +27,36 @@ def message(msg: str, is_json: bool = True) -> str | dict[str, Any]:
return json.loads(resp) if is_json else resp
def is_lua_config() -> bool:
global _lua_config_cache
if _lua_config_cache is not None:
return _lua_config_cache
try:
result = message("systeminfo", is_json=False)
for line in result.splitlines():
if "configProvider:" in line:
_lua_config_cache = "lua" in line.lower()
return _lua_config_cache
_lua_config_cache = False
return False
except Exception:
_lua_config_cache = False
return False
DISPATCHER_MAP_LUA = {
"togglespecialworkspace": lambda *a: f'hl.dsp.workspace.toggle_special("{a[0]}")' if a else 'hl.dsp.workspace.toggle_special()',
"movetoworkspacesilent": lambda *a: (
f'hl.dsp.window.move({{window = "address:{a[0].split(",")[1].replace("address:", "")}", workspace = "{a[0].split(",")[0]}", follow = false}})'
),
"exec": lambda *a: 'hl.dsp.exec_cmd("' + ' '.join(a).replace('\\', '\\\\').replace('"', '\\"') + '")',
}
def dispatch(dispatcher: str, *args: str) -> bool:
if is_lua_config() and dispatcher in DISPATCHER_MAP_LUA:
lua_dispatch = DISPATCHER_MAP_LUA[dispatcher](*args)
return message(f"dispatch {lua_dispatch}", is_json=False) == "ok"
return message(f"dispatch {dispatcher} {' '.join(map(str, args))}".rstrip(), is_json=False) == "ok"
+139
View File
@@ -0,0 +1,139 @@
import sys
from typing import Never
LOG_COLOUR: int = 2
INFO_COLOUR: int = 0
PROMPT_COLOUR: int = 36
WARNING_COLOUR: int = 33
ERROR_COLOUR: int = 31
_disable_input: bool = False
def disable_input() -> None:
global _disable_input
_disable_input = True
def log_exception(func):
"""Log exceptions to stderr instead of raising.
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
error(f'exception during "{func.__name__}()": {str(e)}')
return wrapper
def format_msg(colour: int, prefix: bool, msg: str) -> str:
return f"\033[{colour}m{':: ' if prefix else ''}{msg}\033[0m"
def log(msg: str, prefix: bool = True) -> None:
print(format_msg(LOG_COLOUR, prefix, msg))
def info(msg: str, prefix: bool = True) -> None:
print(format_msg(INFO_COLOUR, prefix, msg))
def warn(msg: str, prefix: bool = True) -> None:
print(format_msg(WARNING_COLOUR, prefix, f"Warning: {msg}"))
def error(err: str | Exception, prefix: bool = True) -> None:
print(format_msg(ERROR_COLOUR, prefix, f"Error: {err}"), file=sys.stderr)
def fatal(err: str | Exception, prefix: bool = True) -> Never:
print(format_msg(ERROR_COLOUR, prefix, f"Fatal: {err}"), file=sys.stderr)
sys.exit(1)
def _input(prompt: str) -> str:
if _disable_input:
print(prompt, end="")
return ""
try:
return input(prompt)
except (KeyboardInterrupt, EOFError):
print()
raise KeyboardInterrupt()
def prompt(msg: str, prefix: bool = True, end: str = " ") -> str:
return _input(format_msg(PROMPT_COLOUR, prefix, msg) + end)
def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool:
suffix = " [Y/n]" if default else " [y/N]"
answer = prompt(msg + suffix, prefix=prefix).strip().lower()
if not answer:
return default
return answer in ("y", "yes")
def prompt_selection(items: list[str], header: str) -> list[str]:
"""Prompt the user to pick from a numbered list, returning the selected items.
Accepts `[A]ll`/`a`, single indices, ranges (`1-3`) and exclusions (`^4`).
Empty input selects nothing. Re-prompts until the input parses.
"""
print(format_msg(PROMPT_COLOUR, True, header))
max_idx_w = len(str(len(items)))
for i, item in enumerate(items):
print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{item}"))
print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)"))
def valid_idx(v: str) -> int:
try:
idx = int(v, base=10) - 1 # -1 to translate to 0 index
except ValueError:
raise ValueError(f'Given value "{v}" must be an integer.')
if idx < 0 or idx >= len(items):
raise ValueError(f'Given value "{v}" must be between 1 and {len(items)} inclusive.')
return idx
def parse(ans: str) -> list[str]:
if ans in ("a", "all"):
return list(items)
if not ans:
return []
selected: list[str] = []
for tok in ans.split():
fr, sep, to = tok.partition("-")
if sep:
lo, hi = valid_idx(fr), valid_idx(to)
if lo > hi:
raise ValueError(f'Given range "{tok}" must be lo-hi.')
selected += items[lo : hi + 1]
elif tok.startswith("^"):
t = valid_idx(tok[1:])
selected += items[:t] + items[t + 1 :]
else:
selected.append(items[valid_idx(tok)])
return list(set(selected))
while True:
ans = prompt("", end="").lower().strip()
try:
return parse(ans)
except ValueError as e:
warn(f"invalid input. {e} Please try again.")
def pause() -> None:
if _disable_input:
return
_input("\n\033[2m\033[3m(Ctrl+C to exit, enter to continue)\033[0m")
print("\033[1A\r\033[2K\033[1A\r\033[2K", end="") # Clear pause prompt
-22
View File
@@ -1,22 +0,0 @@
from time import strftime
def log_message(message: str) -> None:
timestamp = strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def log_exception(func):
"""Log exceptions to stdout instead of raising
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
log_message(f'Error during execution of "{func.__name__}()": {str(e)}')
return wrapper
+1 -1
View File
@@ -18,7 +18,7 @@ from typing import Protocol, Any
# subclasses in get_scheme() handle that internally. This Protocol tells the type
# checker to expect our specific 3-argument setup instead of the base class signature.
class SchemeConstructor(Protocol):
def __call__(self, source_color_hct: Any, is_dark: bool, contrast_level: float) -> DynamicScheme: ...
def __call__(self, source_color_hct: Any, is_dark: bool, contrast_level: float) -> "DynamicScheme": ...
try:
from materialyoucolor.dynamiccolor.dynamic_scheme import DynamicScheme
+31 -5
View File
@@ -1,11 +1,12 @@
import hashlib
import json
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any
from caelestia.utils.io import warn
config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config"))
data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share"))
state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state"))
@@ -24,6 +25,10 @@ templates_dir: Path = cli_data_dir / "templates"
user_templates_dir: Path = c_config_dir / "templates"
theme_dir: Path = c_state_dir / "theme"
config_backup_dir: Path = config_dir.parent / f"{config_dir.name}.bak"
dots_dir: Path = c_state_dir / "dots"
dots_state_path: Path = c_state_dir / "dots-state.json"
scheme_path: Path = c_state_dir / "scheme.json"
scheme_data_dir: Path = cli_data_dir / "schemes"
scheme_cache_dir: Path = c_cache_dir / "schemes"
@@ -52,8 +57,29 @@ def compute_hash(path: Path | str) -> str:
return sha.hexdigest()
def atomic_dump(path: Path, content: dict[str, Any]) -> None:
with tempfile.NamedTemporaryFile("w") as f:
json.dump(content, f)
def atomic_write(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
f = tempfile.NamedTemporaryFile("w", dir=path.parent, delete=False)
try:
with f:
f.write(content)
f.flush()
shutil.move(f.name, path)
os.fsync(f.fileno())
os.replace(f.name, path)
except BaseException:
os.unlink(f.name)
raise
def atomic_dump(path: Path, content: dict[str, Any]) -> None:
atomic_write(path, json.dumps(content))
def get_config() -> dict[str, Any]:
try:
return json.loads(user_config_path.read_text())
except json.JSONDecodeError:
warn("failed to parse config, invalid JSON")
except FileNotFoundError:
pass
return {}
+32 -35
View File
@@ -8,14 +8,16 @@ import tempfile
from pathlib import Path
from caelestia.utils.colour import get_dynamic_colours
from caelestia.utils.logging import log_exception
from caelestia.utils.hypr import is_lua_config
from caelestia.utils.io import log_exception
from caelestia.utils.paths import (
atomic_write,
c_state_dir,
config_dir,
data_dir,
get_config,
templates_dir,
theme_dir,
user_config_path,
user_templates_dir,
)
from caelestia.utils.scheme import get_scheme
@@ -28,6 +30,14 @@ def gen_conf(colours: dict[str, str]) -> str:
return conf
def gen_lua(colours: dict[str, str]) -> str:
lua = "return {\n"
for name, colour in colours.items():
lua += f' {name} = "{colour}",\n'
lua += "}"
return lua
def gen_scss(colours: dict[str, str]) -> str:
scss = ""
for name, colour in colours.items():
@@ -110,15 +120,6 @@ def gen_sequences(colours: dict[str, str]) -> str:
)
def write_file(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w") as f:
f.write(content)
f.flush()
shutil.move(f.name, path)
@log_exception
def apply_terms(sequences: str) -> None:
state = c_state_dir / "sequences.txt"
@@ -144,57 +145,56 @@ def apply_terms(sequences: str) -> None:
@log_exception
def apply_hypr(conf: str) -> None:
write_file(config_dir / "hypr/scheme/current.conf", conf)
ext = "lua" if is_lua_config() else "conf"
atomic_write(config_dir / f"hypr/scheme/current.{ext}", conf)
@log_exception
def apply_discord(scss: str) -> None:
import tempfile
with tempfile.TemporaryDirectory("w") as tmp_dir:
(Path(tmp_dir) / "_colours.scss").write_text(scss)
conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True)
for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord":
write_file(config_dir / client / "themes/caelestia.theme.css", conf)
atomic_write(config_dir / client / "themes/caelestia.theme.css", conf)
@log_exception
def apply_pandora(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "pandora.json", hash=True)
template = template.replace("{{ $mode }}", mode)
write_file(data_dir / "PandoraLauncher/themes/caelestia.json", template)
atomic_write(data_dir / "PandoraLauncher/themes/caelestia.json", template)
@log_exception
def apply_spicetify(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini")
write_file(config_dir / "spicetify/Themes/caelestia/color.ini", template)
atomic_write(config_dir / "spicetify/Themes/caelestia/color.ini", template)
@log_exception
def apply_fuzzel(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "fuzzel.ini")
write_file(config_dir / "fuzzel/fuzzel.ini", template)
atomic_write(config_dir / "fuzzel/fuzzel.ini", template)
@log_exception
def apply_btop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "btop.theme", hash=True)
write_file(config_dir / "btop/themes/caelestia.theme", template)
atomic_write(config_dir / "btop/themes/caelestia.theme", template)
subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL)
@log_exception
def apply_nvtop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True)
write_file(config_dir / "nvtop/nvtop.colors", template)
atomic_write(config_dir / "nvtop/nvtop.colors", template)
@log_exception
def apply_htop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "htop.theme", hash=True)
write_file(config_dir / "htop/htoprc", template)
atomic_write(config_dir / "htop/htoprc", template)
subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL)
@@ -310,8 +310,8 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None)
for gtk_version in ["gtk-3.0", "gtk-4.0"]:
gtk_config_dir = config_dir / gtk_version
write_file(gtk_config_dir / "gtk.css", gtk_template)
write_file(gtk_config_dir / "thunar.css", thunar_template)
atomic_write(gtk_config_dir / "gtk.css", gtk_template)
atomic_write(gtk_config_dir / "thunar.css", thunar_template)
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"])
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"])
@@ -324,13 +324,13 @@ def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None)
@log_exception
def apply_qt(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None:
colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True)
write_file(config_dir / "qtengine/caelestia.colors", colours)
atomic_write(config_dir / "qtengine/caelestia.colors", colours)
config = (templates_dir / "qtengine.json").read_text()
config = config.replace("{{ $mode }}", mode.capitalize())
if icon_theme is not None:
config = config.replace(f'"iconTheme": "Papirus-{mode.capitalize()}"', f'"iconTheme": "{icon_theme}"')
write_file(config_dir / "qtengine/config.json", config)
atomic_write(config_dir / "qtengine/config.json", config)
@log_exception
@@ -339,7 +339,7 @@ def apply_warp(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "warp.yaml", hash=True)
template = template.replace("{{ $warp_mode }}", warp_mode)
write_file(data_dir / "warp-terminal/themes/caelestia.yaml", template)
atomic_write(data_dir / "warp-terminal/themes/caelestia.yaml", template)
@log_exception
@@ -361,7 +361,7 @@ def apply_chromium(colours: dict[str, str]) -> None:
print(f"Unable to create {policy_dir} directory")
continue
# Use tee instead of write_file cause we need sudo
# Use tee instead of atomic_write cause we need sudo
subprocess.run(
["sudo", "-n", "tee", str(policy_dir / "caelestia.json")],
input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}),
@@ -384,13 +384,13 @@ def apply_zed(colours: dict[str, str], mode: str) -> None:
theme_path.unlink()
content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode)
write_file(theme_path, content)
atomic_write(theme_path, content)
@log_exception
def apply_cava(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "cava.conf", hash=True)
write_file(config_dir / "cava/config", template)
atomic_write(config_dir / "cava/config", template)
subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL)
@@ -402,7 +402,7 @@ def apply_user_templates(colours: dict[str, str], mode: str) -> None:
for file in user_templates_dir.iterdir():
if file.is_file():
content = gen_replace_dynamic(colours, file, mode)
write_file(theme_dir / file.name, content)
atomic_write(theme_dir / file.name, content)
def apply_colours(colours: dict[str, str], mode: str) -> None:
@@ -417,10 +417,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
except BlockingIOError:
return
try:
cfg = json.loads(user_config_path.read_text())["theme"]
except (FileNotFoundError, json.JSONDecodeError, KeyError):
cfg = {}
cfg = get_config().get("theme", {})
def check(key: str) -> bool:
return cfg[key] if key in cfg else True
@@ -428,7 +425,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
if check("enableTerm"):
apply_terms(gen_sequences(colours))
if check("enableHypr"):
apply_hypr(gen_conf(colours))
apply_hypr(gen_lua(colours) if is_lua_config() else gen_conf(colours))
if check("enableDiscord"):
apply_discord(gen_scss(colours))
if check("enableSpicetify"):
+3 -7
View File
@@ -2,7 +2,6 @@ import json
import os
import random
import subprocess
from argparse import Namespace
from pathlib import Path
from typing import cast
@@ -11,12 +10,12 @@ from materialyoucolor.hct import Hct
from materialyoucolor.utils.color_utils import argb_from_rgb
from PIL import Image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.hypr import message
from caelestia.utils.material import get_colours_for_image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.paths import (
compute_hash,
user_config_path,
get_config,
wallpaper_link_path,
wallpaper_path_path,
wallpaper_thumbnail_path,
@@ -186,8 +185,7 @@ def set_wallpaper(wall: Path, no_smart: bool) -> None:
apply_colours(scheme.colours, scheme.mode)
# Run custom post-hook if configured
try:
cfg = json.loads(user_config_path.read_text()).get("wallpaper", {})
cfg = get_config().get("wallpaper", {})
if post_hook := cfg.get("postHook"):
subprocess.run(
post_hook,
@@ -204,8 +202,6 @@ def set_wallpaper(wall: Path, no_smart: bool) -> None:
},
stderr=subprocess.DEVNULL,
)
except (FileNotFoundError, json.JSONDecodeError):
pass
def set_random(args: Namespace) -> None: