98 Commits

Author SHA1 Message Date
2 * r + 2 * t b00dabaa93 chore: update readme 2026-06-20 14:35:26 +10:00
2 * r + 2 * t 9cae22e790 feat: remove app2unit 2026-06-20 14:28:03 +10:00
github-actions 0518ca0131 [CI] chore: update flake 2026-06-20 04:16:19 +00:00
2 * r + 2 * t 702baa117b fix: resolve actual package names
provides=... allows aliasing packages to others, and pacman -D only
takes the actual package names, so we need to resolve them.
2026-06-19 20:17:50 +10:00
2 * r + 2 * t 2e9a387951 fix: update local packages during update cmd (#128)
* fix: update local packages based on pkgver/rel

* fixes

* fix: default no rebuild on vercmp fail
2026-06-19 18:17:02 +10:00
2 * r + 2 * t cfc62f683e fix: deref legacy symlinks (#127)
* fix: deref legacy syms before deploy

Deploy will completely overwrite symlinked dirs, so you'd lose all extra
content inside them
Also deref syms in config backup before deleting legacy dir

* fix: restore link if deref fails
2026-06-19 17:11:39 +10:00
2 * r + 2 * t 096e583618 fix: only match dirs when globs if a tail is given 2026-06-19 02:40:13 +10:00
2 * r + 2 * t 8535338e6f feat: add post package hook + deploy after package
Deploy after package install on install command. Update command stays
the same
2026-06-19 01:28:47 +10:00
2 * r + 2 * t 6c3b69cb84 fix: catch errors with package installation 2026-06-18 22:36:59 +10:00
2 * r + 2 * t f53b3d036f feat: add migration step to install cmd (#126)
* feat: add migration step to install cmd

* fix: set packages to explicitly installed

* fix: legacy remote check

Oops

* fix: generator

* fix: better legacy detection

* fix: run legacy detection before deployment

Also fix unlink on dir

* fix: legacy file check issue

* fix: handle no legacy

* fix: make sure not to go past home when looking for repo

* fix: wrong dir for default legacy path

* fix: catch errors with deleting legacy install
2026-06-18 21:40:05 +10:00
github-actions 7ff0913826 [CI] chore: update flake 2026-06-18 04:43:48 +00:00
2 * r + 2 * t 32a88d4d62 Merge pull request #125 from caelestia-dots/feat/add-update-cmd
feat: add update cmd
2026-06-18 00:59:39 +10:00
2 * r + 2 * t 3d51f46b21 feat: cache git blobs 2026-06-18 00:04:23 +10:00
2 * r + 2 * t 91e55a322f fix: account for fs errors when reading files 2026-06-17 23:57:10 +10:00
2 * r + 2 * t 844f6d22b3 fix: account for moved src 2026-06-17 23:56:42 +10:00
2 * r + 2 * t c09cd1a609 fix: warn on duplicate components in manifest 2026-06-17 23:17:24 +10:00
2 * r + 2 * t 0410fed68c fix: remove untracked files from state 2026-06-17 23:17:08 +10:00
2 * r + 2 * t d83a85745d fix: save state after each update phase
So cancellations per phase don't leave state partially wrong
Also part 2 of the previous commit, wire into updater
2026-06-17 22:13:10 +10:00
2 * r + 2 * t 68a758a53b fix: handle user deleted but upstream changed properly
Also catch error reading blob for new files
2026-06-17 22:11:39 +10:00
2 * r + 2 * t 338c78f789 fix: disable git transforming weird chars
Git transforms non ascii and other chars into octal escaped versions,
which we don't want
2026-06-17 22:10:17 +10:00
2 * r + 2 * t be13e5897b feat: add completions for update cmd 2026-06-17 21:28:58 +10:00
2 * r + 2 * t 4824483bba feat: implement update command 2026-06-17 21:27:30 +10:00
2 * r + 2 * t a0aa37bb9b fix: remove duplicate resolve component 2026-06-17 21:05:46 +10:00
2 * r + 2 * t 710cba39c3 refactor: reusable select prompt + hooks + local build 2026-06-17 20:57:17 +10:00
2 * r + 2 * t c8e18ef6ed refactor: use aur helpers constant for parser 2026-06-17 20:21:21 +10:00
2 * r + 2 * t 222023f6d5 fix: handle applied rev not existing in diff 2026-06-17 20:20:50 +10:00
2 * r + 2 * t 7def47d120 feat: add diff module 2026-06-17 17:46:55 +10:00
github-actions 5e2335feb9 [CI] chore: update flake 2026-06-17 04:49:48 +00:00
2 * r + 2 * t be61b8b072 feat: record files deployed 2026-06-17 01:52:31 +10:00
2 * r + 2 * t 0980986ed4 refactor: move expand/expand_dests to entry methods 2026-06-17 00:58:06 +10:00
github-actions 63a6e5a6f2 [CI] chore: update flake 2026-06-16 05:09:25 +00:00
2 * r + 2 * t ecf6c2723d Merge pull request #123 from caelestia-dots/feat/add-install-cmd
feat: add install command
2026-06-16 01:33:40 +10:00
2 * r + 2 * t 342dfc71e1 fix: wrong docstring 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 51e858b73f fix: mutable default param 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 7d9b685918 fix: only fetch source once 2026-06-16 01:22:30 +10:00
2 * r + 2 * t 1f5b39281c feat: add completions for install cmd 2026-06-15 23:51:35 +10:00
2 * r + 2 * t a8f0dc3271 feat: add install command to parser 2026-06-15 23:48:00 +10:00
2 * r + 2 * t e02fc7427d feat: allow disabling print prefix 2026-06-15 23:47:53 +10:00
2 * r + 2 * t 56f2e94d5b fix: no hooks subobject for consistency
So global hooks are consistent with per component hooks
2026-06-15 23:46:56 +10:00
2 * r + 2 * t d1ed5d9db1 fix: deterministic component ordering
Keep manifest order for package installation
2026-06-15 23:46:56 +10:00
2 * r + 2 * t 73bc3aadab feat: retry on invalid input instead of exiting 2026-06-15 23:46:56 +10:00
2 * r + 2 * t d55647fd03 feat: add more info at end of install
Also add newlines between sections
2026-06-15 23:46:56 +10:00
2 * r + 2 * t 1fc51410fc chore: log -> info for hooks 2026-06-15 23:46:56 +10:00
2 * r + 2 * t a8d67b44ee fix: stop makepkg from resetting sudo 2026-06-15 23:46:56 +10:00
2 * r + 2 * t c93fa1488e fix: align component prompt regardless of digits 2026-06-15 23:46:56 +10:00
2 * r + 2 * t 024df497d1 fix: re-clone repo if url changed 2026-06-15 22:57:16 +10:00
2 * r + 2 * t 994f2d86f5 feat: prompt installing optional components 2026-06-15 22:57:16 +10:00
2 * r + 2 * t efd59b79d9 fix: catch source errors 2026-06-15 22:57:16 +10:00
2 * r + 2 * t e6031ad544 feat: add script for testing in sandbox 2026-06-15 22:57:16 +10:00
github-actions 5c062e6897 [CI] chore: update flake 2026-06-15 05:12:57 +00:00
2 * r + 2 * t f85103eac5 fix: deployer place dir docstring 2026-06-14 21:39:01 +10:00
2 * r + 2 * t aef48072ec fix: actually use component error 2026-06-14 21:29:56 +10:00
2 * r + 2 * t 216547c9c1 fix: ensure only single resolve for manifest comps 2026-06-14 21:25:53 +10:00
2 * r + 2 * t 8627b7b96f fix: use tempdir for aur helper install 2026-06-14 21:23:10 +10:00
2 * r + 2 * t 44df61b22d refactor: set default aur helper to constant 2026-06-14 21:22:58 +10:00
2 * r + 2 * t 36a6029a2c feat: add install command
Not wired yet
2026-06-14 21:14:33 +10:00
github-actions 4090c4fc91 [CI] chore: update flake 2026-06-14 04:57:48 +00:00
2 * r + 2 * t 393dbf6363 feat: allow disabling input in io module 2026-06-13 20:19:22 +10:00
2 * r + 2 * t 586f4d9665 fix: allow logging exceptions + Never fatal return 2026-06-13 20:18:48 +10:00
2 * r + 2 * t 14732e9850 fix: handle ctrl+c/d cleanly
Also add newline before pause prompt
2026-06-13 03:12:06 +10:00
2 * r + 2 * t 1c707d3a16 refactor: make resizer use new loggers 2026-06-13 02:59:06 +10:00
2 * r + 2 * t c236823b76 chore: format 2026-06-13 02:59:06 +10:00
2 * r + 2 * t 002a9c287f refactor: add get_config func 2026-06-13 02:59:06 +10:00
2 * r + 2 * t d7b65b5946 refactor: move atomic write to paths
Also make it a true atomic write via os.rename (create temp in parent
dir so guaranteed same fs)
2026-06-13 02:14:11 +10:00
2 * r + 2 * t c860b389c3 refactor: rename logging -> io + add more funcs 2026-06-13 01:50:55 +10:00
github-actions 3f3229aed4 [CI] chore: update flake 2026-06-12 04:44:32 +00:00
github-actions b790c32715 [CI] chore: update flake 2026-06-11 04:41:47 +00:00
github-actions 6ddbb4f1c3 [CI] chore: update flake 2026-06-10 04:19:30 +00:00
github-actions 05b7714289 [CI] chore: update flake 2026-06-09 04:08:00 +00:00
github-actions 84790f8fc3 [CI] chore: update flake 2026-06-08 04:53:59 +00:00
github-actions 505a02f5ab [CI] chore: update flake 2026-06-07 04:47:43 +00:00
github-actions d1c8c8fc09 [CI] chore: update flake 2026-06-02 04:44:32 +00:00
2 * r + 2 * t ad533a0dd4 fix: only screenshot focused monitor in fs mode 2026-06-01 21:06:04 +10:00
İlyas ccd2712982 fix: Lua dispatcher compat (#112)
* fix: temporary Lua dispatcher compat for workspace dispatchers

* fix(resizer): add Lua dispatcher compat for window resize/move/float/center

* feat(theme): write current.lua for Hyprland Lua config, current.conf for hyprlang

* fix: align gen_lua format with #111

* refactor address review feedback
refactor(hypr,theme): address review feedback

- cache is_lua_config result to avoid redundant socket calls
- remove is_lua_config wrapper, rename _is_lua_config to is_lua_config
- move hypr import to top of theme.py
- use single line syntax in apply_hypr and apply_colours

* restore original specialws behavior and some formatting
2026-05-31 23:48:33 +10:00
github-actions 1ea661859d [CI] chore: update flake 2026-05-31 04:25:32 +00:00
github-actions 64a5507e74 [CI] chore: update flake 2026-05-26 04:08:39 +00:00
github-actions 7fa3fc1bd0 [CI] chore: update flake 2026-05-24 04:20:58 +00:00
github-actions 7f30062670 [CI] chore: update flake 2026-05-23 04:01:11 +00:00
github-actions 04d286eaff [CI] chore: update flake 2026-05-17 04:09:50 +00:00
github-actions 2ce6213698 [CI] chore: update flake 2026-05-13 03:57:27 +00:00
Zynix 4b3ffcd644 fix: defer DynamicScheme annotation evaluation (#110) 2026-05-11 16:48:11 +10:00
github-actions 2621724c55 [CI] chore: update flake 2026-05-10 04:03:00 +00:00
github-actions 7b8a4281aa [CI] chore: update flake 2026-05-07 03:45:02 +00:00
github-actions 7452974dc9 [CI] chore: update flake 2026-05-06 03:53:45 +00:00
github-actions 544b567668 [CI] chore: update flake 2026-05-05 03:33:35 +00:00
github-actions 1f523c7556 [CI] chore: update flake 2026-05-03 04:00:27 +00:00
2 * r + 2 * t a00e71d6b7 docs: add iconTheme options to example conf 2026-05-02 22:51:35 +10:00
2 * r + 2 * t 1ec969d9ec fix: use auto bars for cava 2026-05-02 22:48:41 +10:00
2 * r + 2 * t 5273ed514f feat: add theme postHook 2026-05-02 22:24:26 +10:00
github-actions f3b13affc3 [CI] chore: update flake 2026-05-02 03:39:49 +00:00
Haikal 5c9ce66c03 feat: expose more environment variables in post-hook (#107)
* feat: expose more environment variables in post-hook

* fix: formatted
2026-04-29 23:56:07 +10:00
github-actions c18f749f24 [CI] chore: update flake 2026-04-29 03:43:15 +00:00
2 * r + 2 * t 96fcdf5bce fix: use hypr socket instead of hyprctl 2026-04-28 21:20:40 +10:00
github-actions eddee4deca [CI] chore: update flake 2026-04-25 03:06:00 +00:00
Foxlike Creature 68bc03bc17 feat: allow overriding icon theme via cli.json (#106)
* theme: allow overriding Qt icon theme via cli.json

Papirus colors XDG special folders (Downloads, Pictures, Music, etc.)
differently from regular ones - they end up a different color while
everything else stays neutral. With themes like breeze-dark, all folder
icons share the same style, so everything looks consistent.

Add optional `iconTheme` field to the `theme` section of cli.json.
When set, it replaces the Papirus icon theme in the generated qtengine
config with the specified theme.

Example usage in cli.json:
  "theme": { "iconTheme": "breeze-dark" }

* theme: allow overriding Qt and GTK icon theme via cli.json

Some folders in Dolphin end up with Papirus-style icons while others
use the default theme icons, resulting in two different icon styles
mixed together in the same view. Dolphin's default folder icons take
their color directly from the active color scheme, so they always match
the theme exactly - Papirus has a fixed, limited palette and does not
always match.

Add optional iconThemeDark and iconThemeLight fields to the theme
section of cli.json. When set, they override the Papirus icon theme in
both the generated qtengine config and the GTK dconf setting. A generic
iconTheme field is also supported as a fallback for both modes.

Example usage in cli.json:
  "theme": { "iconThemeDark": "breeze-dark", "iconThemeLight": "breeze" }

---------

Co-authored-by: Foxlike Creature <safonovkirill113@gmail.com>
2026-04-24 14:55:47 +10:00
github-actions 023a30b83c [CI] chore: update flake 2026-04-21 03:27:51 +00:00
github-actions a192efae9c [CI] chore: update flake 2026-04-20 03:35:55 +00:00
2 * r + 2 * t 463f36544a docs: add missing theme.enable* opts to example conf 2026-04-19 16:01:09 +10:00
29 changed files with 2096 additions and 180 deletions
+20 -4
View File
@@ -8,7 +8,6 @@ The main control script for the Caelestia dotfiles.
- [`swappy`](https://github.com/jtheoof/swappy) - screenshot editor - [`swappy`](https://github.com/jtheoof/swappy) - screenshot editor
- [`grim`](https://gitlab.freedesktop.org/emersion/grim) - taking screenshots - [`grim`](https://gitlab.freedesktop.org/emersion/grim) - taking screenshots
- [`dart-sass`](https://github.com/sass/dart-sass) - discord theming - [`dart-sass`](https://github.com/sass/dart-sass) - discord theming
- [`app2unit`](https://github.com/Vladimir-csp/app2unit) - launching apps
- [`wl-clipboard`](https://github.com/bugaevc/wl-clipboard) - copying to clipboard - [`wl-clipboard`](https://github.com/bugaevc/wl-clipboard) - copying to clipboard
- [`slurp`](https://github.com/emersion/slurp) - selecting an area - [`slurp`](https://github.com/emersion/slurp) - selecting an area
- [`gpu-screen-recorder`](https://git.dec05eba.com/gpu-screen-recorder/about) - screen recording - [`gpu-screen-recorder`](https://git.dec05eba.com/gpu-screen-recorder/about) - screen recording
@@ -77,7 +76,7 @@ Install all [dependencies](#dependencies), then install
e.g. via an AUR helper (yay) e.g. via an AUR helper (yay)
```sh ```sh
yay -S libnotify swappy grim dart-sass app2unit wl-clipboard slurp gpu-screen-recorder glib2 cliphist fuzzel python-build python-installer python-hatch python-hatch-vcs yay -S libnotify swappy grim dart-sass wl-clipboard slurp gpu-screen-recorder glib2 cliphist fuzzel python-build python-installer python-hatch python-hatch-vcs
``` ```
Now, clone the repo, `cd` into it, build the wheel via `python -m build --wheel` Now, clone the repo, `cd` into it, build the wheel via `python -m build --wheel`
@@ -159,6 +158,8 @@ subcommands:
emoji emoji/glyph utilities emoji emoji/glyph utilities
wallpaper manage the wallpaper wallpaper manage the wallpaper
resizer window resizer daemon resizer window resizer daemon
install install the Caelestia dotfiles
update update the Caelestia dotfiles
``` ```
### User templates ### User templates
@@ -191,17 +192,28 @@ All configuration options are in `~/.config/caelestia/cli.json`.
"extraArgs": [] "extraArgs": []
}, },
"wallpaper": { "wallpaper": {
"postHook": "echo $WALLPAPER_PATH" "postHook": "echo $WALLPAPER_PATH $SCHEME_NAME $SCHEME_FLAVOUR $SCHEME_MODE $SCHEME_VARIANT $SCHEME_COLOURS"
}, },
"theme": { "theme": {
"enableTerm": true, "enableTerm": true,
"enableHypr": true, "enableHypr": true,
"enableDiscord": true, "enableDiscord": true,
"enableSpicetify": true, "enableSpicetify": true,
"enablePandora": true,
"enableFuzzel": true, "enableFuzzel": true,
"enableBtop": true, "enableBtop": true,
"enableNvtop": true,
"enableHtop": true,
"enableGtk": true, "enableGtk": true,
"enableQt": true "enableQt": true,
"enableWarp": true,
"enableChromium": true,
"enableZed": true,
"enableCava": true,
"iconTheme": "Papirus-Dark",
"iconThemeLight": "Papirus-Light",
"iconThemeDark": "Papirus-Dark",
"postHook": "echo $SCHEME_NAME $SCHEME_FLAVOUR $SCHEME_MODE $SCHEME_VARIANT $SCHEME_COLOURS"
}, },
"toggles": { "toggles": {
"communication": { "communication": {
@@ -245,6 +257,10 @@ All configuration options are in `~/.config/caelestia/cli.json`.
"move": true "move": true
} }
} }
},
"dots": {
"url": "https://github.com/caelestia-dots/caelestia.git",
"branch": "main"
} }
} }
``` ```
+9
View File
@@ -0,0 +1,9 @@
#!/usr/bin/env sh
export HOME=/tmp/install-test
export XDG_CONFIG_HOME=$HOME/.config
export XDG_DATA_HOME=$HOME/.local/share
export XDG_STATE_HOME=$HOME/.local/state
export XDG_CACHE_HOME=$HOME/.cache
"$@"
+13 -1
View File
@@ -1,7 +1,7 @@
set -l seen '__fish_seen_subcommand_from' set -l seen '__fish_seen_subcommand_from'
set -l has_opt '__fish_contains_opt' set -l has_opt '__fish_contains_opt'
set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer set -l commands shell toggle scheme screenshot record clipboard emoji-picker wallpaper resizer install update
set -l not_seen "not $seen $commands" set -l not_seen "not $seen $commands"
# Disable file completions # Disable file completions
@@ -20,6 +20,8 @@ complete -c caelestia -n $not_seen -a 'clipboard' -d 'Open clipboard history'
complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities' complete -c caelestia -n $not_seen -a 'emoji' -d 'Emoji/glyph utilities'
complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper' complete -c caelestia -n $not_seen -a 'wallpaper' -d 'Manage the wallpaper'
complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer' complete -c caelestia -n $not_seen -a 'resizer' -d 'Window resizer'
complete -c caelestia -n $not_seen -a 'install' -d 'Install the Caelestia dotfiles'
complete -c caelestia -n $not_seen -a 'update' -d 'Update the Caelestia dotfiles'
# Shell # Shell
set -l commands mpris drawers wallpaper notifs set -l commands mpris drawers wallpaper notifs
@@ -126,3 +128,13 @@ complete -c caelestia -n "$seen emoji" -s 'f' -l 'fetch' -d 'Fetch emoji/glyph d
complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode' complete -c caelestia -n "$seen resizer" -s 'd' -l 'daemon' -d 'Start in daemon mode'
complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode' complete -c caelestia -n "$seen resizer" -a 'pip' -d 'Quick pip mode'
complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window' complete -c caelestia -n "$seen resizer" -a 'active' -d 'Select the active window'
# Install (component flags come from the manifest, so are not completed statically)
complete -c caelestia -n "$seen install" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen install" -l 'enable-components' -d 'List of components to enable' -r
complete -c caelestia -n "$seen install" -l 'disable-components' -d 'List of components to disable' -r
complete -c caelestia -n "$seen install" -l 'noconfirm' -d 'Use defaults for all prompts'
# Update
complete -c caelestia -n "$seen update" -l 'aur-helper' -d 'The AUR helper to use' -a 'yay paru' -r
complete -c caelestia -n "$seen update" -l 'noconfirm' -d 'Use defaults for all prompts'
+4 -5
View File
@@ -8,7 +8,7 @@
slurp, slurp,
wl-clipboard, wl-clipboard,
cliphist, cliphist,
app2unit, xdg-utils,
dart-sass, dart-sass,
grim, grim,
fuzzel, fuzzel,
@@ -46,7 +46,7 @@ python3.pkgs.buildPythonApplication {
slurp slurp
wl-clipboard wl-clipboard
cliphist cliphist
app2unit xdg-utils
dart-sass dart-sass
grim grim
fuzzel fuzzel
@@ -65,11 +65,10 @@ python3.pkgs.buildPythonApplication {
substituteInPlace src/caelestia/subcommands/screenshot.py \ substituteInPlace src/caelestia/subcommands/screenshot.py \
--replace-fail '"qs", "-c", "caelestia"' '"caelestia-shell"' --replace-fail '"qs", "-c", "caelestia"' '"caelestia-shell"'
# Use config bin instead of discord + fix todoist + fix app2unit # Use config bin instead of discord + fix todoist
substituteInPlace src/caelestia/subcommands/toggle.py \ substituteInPlace src/caelestia/subcommands/toggle.py \
--replace-fail 'discord' ${discordBin} \ --replace-fail 'discord' ${discordBin} \
--replace-fail '["todoist"]' '["todoist.desktop"]'\ --replace-fail '["todoist"]' '["todoist.desktop"]'
--replace-fail 'app2unit' ${app2unit}/bin/app2unit
# Use config style instead of darkly # Use config style instead of darkly
substituteInPlace src/caelestia/data/templates/qtengine.json \ substituteInPlace src/caelestia/data/templates/qtengine.json \
Generated
+28 -10
View File
@@ -3,17 +3,18 @@
"caelestia-shell": { "caelestia-shell": {
"inputs": { "inputs": {
"caelestia-cli": [], "caelestia-cli": [],
"m3shapes": "m3shapes",
"nixpkgs": [ "nixpkgs": [
"nixpkgs" "nixpkgs"
], ],
"quickshell": "quickshell" "quickshell": "quickshell"
}, },
"locked": { "locked": {
"lastModified": 1775801889, "lastModified": 1781850732,
"narHash": "sha256-q1LGwhQbNOurIAClh5YwKVU2kJ5lTCxRYZf48bAb9IM=", "narHash": "sha256-YKAWz4bSguUWwc1GxOHXRFl4fT+t9WnA2VoZGIRdFVc=",
"owner": "caelestia-dots", "owner": "caelestia-dots",
"repo": "shell", "repo": "shell",
"rev": "0e07176ff149d02391531c802b51c28e73185f30", "rev": "37e603fbf6f973a09f451553b61ac584d9877cf1",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -22,13 +23,30 @@
"type": "github" "type": "github"
} }
}, },
"m3shapes": {
"flake": false,
"locked": {
"lastModified": 1781017666,
"narHash": "sha256-kfHyzZaPHgqZML48OA+5JwBOsLdQJ2ci/aGPShvUB4Y=",
"owner": "soramanew",
"repo": "m3shapes",
"rev": "bdc327b29f95394a732baf3c9b19658ba23755b6",
"type": "github"
},
"original": {
"owner": "soramanew",
"repo": "m3shapes",
"rev": "bdc327b29f95394a732baf3c9b19658ba23755b6",
"type": "github"
}
},
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1775710090, "lastModified": 1781577229,
"narHash": "sha256-ar3rofg+awPB8QXDaFJhJ2jJhu+KqN/PRCXeyuXR76E=", "narHash": "sha256-lrp67w8AulE9Ks53n27I45ADSzbOCn4H+CNW1Ck8B+8=",
"owner": "nixos", "owner": "nixos",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "4c1018dae018162ec878d42fec712642d214fdfa", "rev": "567a49d1913ce81ac6e9582e3553dd90a955875f",
"type": "github" "type": "github"
}, },
"original": { "original": {
@@ -46,11 +64,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1772925576, "lastModified": 1781053488,
"narHash": "sha256-mMoiXABDtkSJxCYDrkhJ/TrrJf5M46oUfIlJvv2gkZ0=", "narHash": "sha256-P4WEBaKgl8flRckHxXGHzT0potPvB3x8ZFIp9gLEAMY=",
"ref": "refs/heads/master", "ref": "refs/heads/master",
"rev": "15a84097653593dd15fad59a56befc2b7bdc270d", "rev": "d99d87d5e5ec4e696815348692fdaaf0b6be1b2c",
"revCount": 750, "revCount": 822,
"type": "git", "type": "git",
"url": "https://git.outfoxxed.me/outfoxxed/quickshell" "url": "https://git.outfoxxed.me/outfoxxed/quickshell"
}, },
+4
View File
@@ -1,8 +1,10 @@
from caelestia.parser import parse_args from caelestia.parser import parse_args
from caelestia.utils.io import log
from caelestia.utils.version import print_version from caelestia.utils.version import print_version
def main() -> None: def main() -> None:
try:
parser, args = parse_args() parser, args = parse_args()
if args.version: if args.version:
print_version() print_version()
@@ -10,3 +12,5 @@ def main() -> None:
args.cls(args).run() args.cls(args).run()
else: else:
parser.print_help() parser.print_help()
except KeyboardInterrupt:
log("Exiting...")
-2
View File
@@ -2,8 +2,6 @@
# Optimized for smooth and responsive visualization # Optimized for smooth and responsive visualization
[general] [general]
# Number of bars (20-200) - fewer bars = better performance
bars = 64
# Framerate (1-144) - higher = smoother but more CPU intensive # Framerate (1-144) - higher = smoother but more CPU intensive
framerate = 60 framerate = 60
+76 -1
View File
@@ -1,6 +1,23 @@
import argparse import argparse
import sys
from caelestia.subcommands import clipboard, emoji, record, resizer, scheme, screenshot, shell, toggle, wallpaper from caelestia.subcommands import (
clipboard,
emoji,
install,
record,
resizer,
scheme,
screenshot,
shell,
toggle,
update,
wallpaper,
)
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import AUR_HELPERS
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import warn
from caelestia.utils.paths import wallpapers_dir from caelestia.utils.paths import wallpapers_dir
from caelestia.utils.scheme import get_scheme_names, scheme_variants from caelestia.utils.scheme import get_scheme_names, scheme_variants
from caelestia.utils.wallpaper import get_wallpaper from caelestia.utils.wallpaper import get_wallpaper
@@ -128,4 +145,62 @@ def parse_args() -> tuple[argparse.ArgumentParser, argparse.Namespace]:
resizer_parser.add_argument("height", nargs="?", help="height to resize to") resizer_parser.add_argument("height", nargs="?", help="height to resize to")
resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)") resizer_parser.add_argument("actions", nargs="?", help="comma-separated actions to apply (float,center,pip)")
# Create parser for install opts
install_parser = command_parser.add_parser(
"install",
help="install the Caelestia dotfiles",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
install_parser.set_defaults(cls=install.Command)
install_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
install_parser.add_argument(
"--enable-components", metavar="LIST", help="comma-separated list of components to enable"
)
install_parser.add_argument(
"--disable-components", metavar="LIST", help="comma-separated list of components to disable"
)
install_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
_set_install_epilog(install_parser)
# Create parser for update opts
update_parser = command_parser.add_parser("update", help="update the Caelestia dotfiles")
update_parser.set_defaults(cls=update.Command)
update_parser.add_argument("--aur-helper", choices=AUR_HELPERS, help="the AUR helper to use")
update_parser.add_argument("--noconfirm", action="store_true", help="use defaults for all prompts")
return parser, parser.parse_args() return parser, parser.parse_args()
def _set_install_epilog(install_parser: argparse.ArgumentParser) -> None:
"""Add components if using install subcommand"""
if len(sys.argv) > 1 and sys.argv[1] == "install":
manifest = _load_install_manifest()
if manifest is not None and manifest.components:
install_parser.epilog = _components_epilog(manifest)
def _load_install_manifest() -> Manifest | None:
source = DotsSource()
try:
source.ensure()
return source.manifest_at(source.remote_ref)
except Exception as e:
warn(f"failed to load manifest from dots repo ({e})\n", prefix=False)
return None
def _components_epilog(manifest: Manifest) -> str:
def e(*v: int) -> str:
return f"\033[{';'.join(str(c) for c in v)}m"
def b(c: int) -> str:
return e(1, c)
reset = e(0)
width = max(len(name) for name in manifest.components)
lines = [f"{b(34)}available components (for --enable-components / --disable-components):{reset}"]
for name, comp in manifest.components.items():
lines.append(f" {b(32)}{name:<{width}}{reset}\t{'(default)' if comp.default else '(off)'}")
return "\n".join(lines)
+266
View File
@@ -0,0 +1,266 @@
import shutil
import textwrap
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.legacy import (
LEGACY_META_PKG,
detect_legacy_repo,
legacy_config_symlinks,
legacy_symlinks,
legacy_to_delete,
)
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER, PackageError, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import confirm, disable_input, fatal, info, log, pause, prompt_selection, warn
from caelestia.utils.paths import (
config_backup_dir,
config_dir,
)
def _parse_list_arg(value: str | None) -> list[str] | None:
if value is None:
return None
return [item.strip() for item in value.split(",") if item.strip()]
def _deref_symlink(link: Path, target: Path) -> None:
"""Replace symlink `link` with a real copy of `target`'s content."""
bak = link.rename(link.parent / f"{link.name}.bak")
try:
if target.is_dir():
shutil.copytree(target, link, symlinks=True)
else:
shutil.copy2(target, link)
except OSError:
bak.rename(link)
raise
bak.unlink()
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
self.print_greeting()
self.create_backup()
legacy_dir = detect_legacy_repo() # Detect legacy repo first cause deploy overwrites legacy syms
source, tip, manifest = self.fetch_manifest()
try:
installer, packages, local_packages = self.install_packages(source, manifest)
except PackageError as e:
fatal(e)
run_hooks(manifest, "post_package")
self.dereference_legacy(legacy_dir) # Copy legacy content into place before deploy overwrites the symlinks
deployed = self.deploy_configs(source, manifest)
run_hooks(manifest, "post_install")
DotsState(
aur_helper=getattr(installer, "helper", DEFAULT_AUR_HELPER),
applied_rev=tip,
enabled_components=manifest.enabled_components,
packages=packages,
local_packages=local_packages,
deployed_files=deployed,
).save()
self.migrate_legacy(installer, legacy_dir)
self.print_done()
def print_greeting(self) -> None:
print(
"\033[38;2;150;241;241m" # Caelestia colour
+ textwrap.dedent(
r"""
╭─────────────────────────────────────────────────╮
│ ______ __ __ _ │
│ / ____/___ ____ / /__ _____/ /_(_)___ _ │
│ / / / __ `/ _ \/ / _ \/ ___/ __/ / __ `/ │
│ / /___/ /_/ / __/ / __(__ ) /_/ / /_/ / │
\____/\__,_/\___/_/\___/____/\__/_/\__,_/ │
│ │
╰─────────────────────────────────────────────────╯
"""
)
+ "\033[0m"
)
info("Welcome to the Caelestia dotfiles installer!")
info("Here's a quick overview on what this command is going to do:")
info(" - Install dependencies")
info(" - Install config files")
info("The installer does NOT set up hardware/system level configs (e.g. drivers). Please do this yourself.")
pause()
print()
def create_backup(self) -> None:
if config_dir.exists():
if not confirm("Back up the config directory?", default=True):
return
log(f"Creating a backup of {config_dir}...")
if config_backup_dir.exists():
if not confirm("A backup already exists, overwrite?", default=False):
info("Not creating backup.")
return
log("Deleting old backup...")
shutil.rmtree(config_backup_dir)
shutil.copytree(config_dir, config_backup_dir, symlinks=True)
info(f"Created backup at {config_backup_dir}")
def fetch_manifest(self) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.checkout_tip()
except SourceError as e:
fatal(e)
enable = _parse_list_arg(self.args.enable_components)
disable = _parse_list_arg(self.args.disable_components)
try:
manifest = source.manifest_at(tip)
# No flags given, prompt user for non-default components
if enable is None and disable is None:
optional = [name for name, comp in manifest.components.items() if not comp.default]
if optional:
enable = prompt_selection(optional, "Components to enable?")
manifest.resolve_components(enable=enable, disable=disable)
except (SourceError, ManifestError, ComponentError) as e:
fatal(e)
names = ", ".join(manifest.enabled_components) or "none"
info(f"Enabled components: {names}")
return source, tip, manifest
def deploy_configs(self, source: DotsSource, manifest: Manifest) -> dict[str, str]:
print()
log("Installing configs...")
deployer = Deployer()
for entry in manifest.enabled_entries():
src = source.working_path(entry.expanded_src())
if not src.exists():
warn(f"missing in source, skipping: {entry.src}")
continue
dests = entry.expanded_dests()
if not dests:
warn(f"dest glob matched nothing, skipping: {entry.dest}")
continue
for dest in dests:
deployer.place(src, Path(dest))
info(f"{entry.src} -> {dest}")
return deployer.deployed_files
def install_packages(
self, source: DotsSource, manifest: Manifest
) -> tuple[PackageInstaller, list[str], dict[str, list[str]]]:
installer = PackageInstaller.get(self.args.aur_helper, self.args.noconfirm)
packages = manifest.enabled_packages()
if packages:
print()
log("Installing packages...")
installer.install(packages)
local_packages = {}
local_dirs = manifest.enabled_local_packages()
if local_dirs:
print()
log("Building local packages...")
local_packages = build_local_packages(installer, source, local_dirs)
return installer, packages, local_packages
def dereference_legacy(self, legacy_dir: Path | None) -> None:
"""Replace legacy symlinks with real copies of their targets."""
symlinks = legacy_symlinks(legacy_dir)
if not symlinks:
return
print()
log("Preserving content from legacy symlinks...")
for path in symlinks:
target = path.resolve()
if not target.exists():
continue
try:
_deref_symlink(path, target)
info(f"Copied {target} -> {path}")
except OSError as e:
warn(f"failed to preserve {path}: {e}")
def deref_backup_syms(self, legacy_dir: Path | None) -> None:
"""Deref the backup's legacy symlinks before the repo is cleared, so the backup keeps real content."""
if not config_backup_dir.is_dir():
return
for link in legacy_config_symlinks(config_backup_dir, legacy_dir):
target = link.resolve()
if not target.exists():
continue
try:
_deref_symlink(link, target)
except OSError as e:
warn(f"failed to preserve {link} in backup: {e}")
def migrate_legacy(self, installer: PackageInstaller, legacy_dir: Path | None) -> None:
"""Clean up a previous install.fish setup (repo, symlinks and metapackage)."""
to_delete = legacy_to_delete(legacy_dir)
meta_installed = installer.is_installed(LEGACY_META_PKG)
if not to_delete and not meta_installed:
return
print()
log("Found a legacy Caelestia installation...")
if not confirm("Clear legacy installation?"):
return
deployer = Deployer()
try:
self.deref_backup_syms(legacy_dir)
for path in to_delete:
deployer.remove(path)
info(f"Deleted {path}")
if meta_installed:
log("Removing legacy meta package...")
installer.remove([LEGACY_META_PKG])
except (OSError, PackageError) as e:
warn(f"could not fully clear the legacy installation: {e}")
def print_done(self) -> None:
print()
info("All done! Caelestia has been installed.")
info("A few things to finish up:")
info(" - A reboot is recommended for all changes take effect")
info(" - Edit `~/.config/caelestia/hypr-vars.conf` to set default apps, keybinds and much more")
info(" - Edit `~/.config/caelestia/hypr-user.conf` to set your monitor layout and other Hyprland configs")
info(" - Run `caelestia update` later to pull in the latest changes")
info("Enjoy! For support (or to just hang out), join our Discord server: https://discord.gg/BGDCFCmMBk")
+6 -8
View File
@@ -1,4 +1,3 @@
import json
import re import re
import shutil import shutil
import subprocess import subprocess
@@ -7,8 +6,9 @@ from argparse import Namespace
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from caelestia.utils import hypr
from caelestia.utils.notify import close_notification, notify from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir, user_config_path from caelestia.utils.paths import get_config, recording_notif_path, recording_path, recordings_dir
RECORDER = "gpu-screen-recorder" RECORDER = "gpu-screen-recorder"
@@ -36,7 +36,7 @@ class Command:
def start(self) -> None: def start(self) -> None:
args = ["-w"] args = ["-w"]
monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"])) monitors = hypr.message("monitors")
if self.args.region: if self.args.region:
if self.args.region == "slurp": if self.args.region == "slurp":
region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True) region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True)
@@ -64,12 +64,10 @@ class Command:
if self.args.sound: if self.args.sound:
args += ["-a", "default_output"] args += ["-a", "default_output"]
config = get_config()
try: try:
config = json.loads(user_config_path.read_text())
if "record" in config and "extraArgs" in config["record"]: if "record" in config and "extraArgs" in config["record"]:
args += config["record"]["extraArgs"] args += config["record"]["extraArgs"]
except (json.JSONDecodeError, FileNotFoundError):
pass
except TypeError as e: except TypeError as e:
raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}") raise ValueError(f"Config option 'record.extraArgs' should be an array: {e}")
@@ -122,7 +120,7 @@ class Command:
) )
if action == "watch": if action == "watch":
subprocess.Popen(["app2unit", "-O", new_path], start_new_session=True) subprocess.Popen(["xdg-open", new_path], start_new_session=True)
elif action == "open": elif action == "open":
p = subprocess.run( p = subprocess.run(
[ [
@@ -137,6 +135,6 @@ class Command:
] ]
) )
if p.returncode != 0: if p.returncode != 0:
subprocess.Popen(["app2unit", "-O", new_path.parent], start_new_session=True) subprocess.Popen(["xdg-open", new_path.parent], start_new_session=True)
elif action == "delete": elif action == "delete":
new_path.unlink() new_path.unlink()
+71 -55
View File
@@ -7,8 +7,8 @@ from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.logging import log_message from caelestia.utils.io import error, fatal, info, log, warn
from caelestia.utils.paths import user_config_path from caelestia.utils.paths import get_config
class WindowRule: class WindowRule:
@@ -26,14 +26,34 @@ class Command:
self.timeout_tracker: dict[str, float] = {} self.timeout_tracker: dict[str, float] = {}
self.window_rules = self._load_window_rules() self.window_rules = self._load_window_rules()
def _make_resize_cmd(self, width: int | str, height: int | str, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.resize({{x = {width}, y = {height}, exact = true, window = "address:{address}"}})'
return f"dispatch resizewindowpixel exact {width} {height},address:{address}"
def _make_move_cmd(self, x: int, y: int, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.move({{x = {x}, y = {y}, window = "address:{address}"}})'
return f"dispatch movewindowpixel exact {x} {y},address:{address}"
def _make_float_cmd(self, address: str) -> str:
if hypr.is_lua_config():
return f'dispatch hl.dsp.window.float({{action = "toggle", window = "address:{address}"}})'
return f"dispatch togglefloating address:{address}"
def _make_center_cmd(self) -> str:
if hypr.is_lua_config():
return "dispatch hl.dsp.window.center()"
return "dispatch centerwindow"
def _load_window_rules(self) -> list[WindowRule]: def _load_window_rules(self) -> list[WindowRule]:
default_rules = [ default_rules = [
WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]), WindowRule("(Bitwarden", "titleContains", "20%", "54%", ["float", "center"]),
WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]), WindowRule("^[Pp]icture(-| )in(-| )[Pp]icture$", "titleRegex", "", "", ["pip"]),
] ]
config = get_config()
try: try:
config = json.loads(user_config_path.read_text())
if "resizer" in config and "rules" in config["resizer"]: if "resizer" in config and "rules" in config["resizer"]:
rules = [] rules = []
for rule_config in config["resizer"]["rules"]: for rule_config in config["resizer"]["rules"]:
@@ -47,8 +67,8 @@ class Command:
) )
) )
return rules return rules
except (json.JSONDecodeError, KeyError): except KeyError:
log_message("ERROR: invalid config") warn("invalid config, falling back to default rules")
except FileNotFoundError: except FileNotFoundError:
pass pass
@@ -164,16 +184,14 @@ class Command:
move_x = monitor_x + monitor_width - scaled_width - offset move_x = monitor_x + monitor_width - scaled_width - offset
move_y = monitor_y + monitor_height - scaled_height - offset move_y = monitor_y + monitor_height - scaled_height - offset
command1 = f"dispatch resizewindowpixel exact {scaled_width} {scaled_height},address:{address}" command1 = self._make_resize_cmd(scaled_width, scaled_height, address)
command2 = f"dispatch movewindowpixel exact {int(move_x)} {int(move_y)},address:{address}" command2 = self._make_move_cmd(int(move_x), int(move_y), address)
hypr.batch(command1, command2) hypr.batch(command1, command2)
log_message( info(f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})")
f"Applied PiP action to window {address}: {scaled_width}x{scaled_height} at ({move_x}, {move_y})"
)
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply PiP action to window 0x{window_id}: {e}") error(f"failed to apply PiP action to window 0x{window_id}: {e}")
def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool: def _apply_window_actions(self, window_id: str, width: str, height: str, actions: list[str]) -> bool:
dispatch_commands = [] dispatch_commands = []
@@ -181,23 +199,23 @@ class Command:
if "float" in actions: if "float" in actions:
window_info = self._get_window_info(window_id) window_info = self._get_window_info(window_id)
if window_info and not window_info.get("floating", False): if window_info and not window_info.get("floating", False):
dispatch_commands.append(f"dispatch togglefloating address:0x{window_id}") dispatch_commands.append(self._make_float_cmd(f"0x{window_id}"))
if "pip" in actions: if "pip" in actions:
self._apply_pip_action(window_id) self._apply_pip_action(window_id)
return True return True
dispatch_commands.append(f"dispatch resizewindowpixel exact {width} {height},address:0x{window_id}") dispatch_commands.append(self._make_resize_cmd(width, height, f"0x{window_id}"))
if "center" in actions: if "center" in actions:
dispatch_commands.append("dispatch centerwindow") dispatch_commands.append(self._make_center_cmd())
try: try:
hypr.batch(*dispatch_commands) hypr.batch(*dispatch_commands)
log_message(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})") info(f"Applied actions to window 0x{window_id}: {width} x {height} ({', '.join(actions)})")
return True return True
except Exception as e: except Exception as e:
log_message(f"ERROR: Failed to apply window actions for window 0x{window_id}: {e}") error(f"failed to apply window actions for window 0x{window_id}: {e}")
return False return False
def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None: def _match_window_rule(self, window_title: str, initial_title: str) -> WindowRule | None:
@@ -216,7 +234,7 @@ class Command:
if re.search(rule.name, window_title): if re.search(rule.name, window_title):
return rule return rule
except re.error: except re.error:
log_message(f"ERROR: Invalid regex pattern in rule '{rule.name}'") warn(f"invalid regex pattern in rule '{rule.name}'")
return None return None
@@ -238,7 +256,7 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
window_info = self._get_window_info(window_id) window_info = self._get_window_info(window_id)
@@ -248,19 +266,19 @@ class Command:
window_title = window_info.get("title", "") window_title = window_info.get("title", "")
initial_title = window_info.get("initialTitle", "") initial_title = window_info.get("initialTitle", "")
log_message(f"DEBUG: Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'") log(f"Window 0x{window_id} - Title: '{window_title}' | Initial: '{initial_title}'")
rule = self._match_window_rule(window_title, initial_title) rule = self._match_window_rule(window_title, initial_title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for window 0x{window_id}") info(f"Matched rule '{rule.name}' for window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window title event: {e}") warn(f"failed to parse window title event: {e}")
def _handle_open_event(self, event: str) -> None: def _handle_open_event(self, event: str) -> None:
try: try:
@@ -276,22 +294,22 @@ class Command:
window_id = window_id.lstrip(">") window_id = window_id.lstrip(">")
if not all(c in "0123456789abcdefABCDEF" for c in window_id): if not all(c in "0123456789abcdefABCDEF" for c in window_id):
log_message(f"ERROR: Invalid window ID format: {window_id}") warn(f"invalid window ID format: {window_id}")
return return
log_message(f"DEBUG: New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'") log(f"New window 0x{window_id} - Title: '{title}' | Class: '{window_class}'")
rule = self._match_window_rule(title, title) rule = self._match_window_rule(title, title)
if rule: if rule:
if self._is_rate_limited(window_id): if self._is_rate_limited(window_id):
log_message(f"Rate limited: skipping window 0x{window_id}") log(f"Rate limited: skipping window 0x{window_id}")
return return
log_message(f"Matched rule '{rule.name}' for new window 0x{window_id}") info(f"Matched rule '{rule.name}' for new window 0x{window_id}")
self._apply_window_actions(window_id, rule.width, rule.height, rule.actions) self._apply_window_actions(window_id, rule.width, rule.height, rule.actions)
except (IndexError, ValueError) as e: except (IndexError, ValueError) as e:
log_message(f"ERROR: Failed to parse window open event: {e}") warn(f"failed to parse window open event: {e}")
def run(self) -> None: def run(self) -> None:
if self.args.daemon: if self.args.daemon:
@@ -304,7 +322,7 @@ class Command:
): ):
self._run_active_mode() self._run_active_mode()
else: else:
print( info(
"Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode" "Resizer daemon - use --daemon to start, 'pip' for quick pip mode, or provide pattern, match_type, width, height, and actions for active mode"
) )
@@ -313,28 +331,27 @@ class Command:
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
if not active_window_result.get("floating", False): if not active_window_result.get("floating", False):
print(f"Window '{window_title}' is not floating. PIP only works on floating windows.") warn(f"window '{window_title}' is not floating; PiP only works on floating windows.")
print("Try making it floating first with: hyprctl dispatch togglefloating")
return return
print(f"Applying PIP to active window: '{window_title}'") info(f"Applying PiP to active window: '{window_title}'")
self._apply_pip_action(window_id) self._apply_pip_action(window_id)
print("PIP applied successfully") info("PiP applied successfully")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply PIP to active window: {e}") error(f"failed to apply PiP to active window: {e}")
def _run_active_mode(self) -> None: def _run_active_mode(self) -> None:
try: try:
@@ -351,10 +368,10 @@ class Command:
matching_windows = self._find_matching_windows(temp_rule) matching_windows = self._find_matching_windows(temp_rule)
if not matching_windows: if not matching_windows:
print(f"No windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'") warn(f"no windows found matching pattern '{temp_rule.name}' with match type '{temp_rule.match_type}'")
return return
print(f"Found {len(matching_windows)} matching window(s)") info(f"Found {len(matching_windows)} matching window(s)")
# Apply rule to all matching windows # Apply rule to all matching windows
success_count = 0 success_count = 0
@@ -362,41 +379,41 @@ class Command:
window_id = window["address"][2:] # Remove "0x" prefix window_id = window["address"][2:] # Remove "0x" prefix
window_title = window.get("title", "") window_title = window.get("title", "")
print(f"Applying rule to window 0x{window_id}: '{window_title}'") info(f"Applying rule to window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
success_count += 1 success_count += 1
print(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows") info(f"Successfully applied rule to {success_count}/{len(matching_windows)} windows")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule: {e}") error(f"failed to apply rule: {e}")
def _apply_to_active_window(self, temp_rule: WindowRule) -> None: def _apply_to_active_window(self, temp_rule: WindowRule) -> None:
"""Apply rule only to the currently active window""" """Apply rule only to the currently active window"""
try: try:
active_window_result = hypr.message("activewindow") active_window_result = hypr.message("activewindow")
if not isinstance(active_window_result, dict) or not active_window_result.get("address"): if not isinstance(active_window_result, dict) or not active_window_result.get("address"):
print("ERROR: No active window found") error("no active window found")
return return
window_title = active_window_result.get("title", "") window_title = active_window_result.get("title", "")
address = active_window_result.get("address", "") address = active_window_result.get("address", "")
if not isinstance(address, str) or not address.startswith("0x"): if not isinstance(address, str) or not address.startswith("0x"):
print("ERROR: Invalid window address") error("invalid window address")
return return
window_id = address[2:] # Remove "0x" prefix window_id = address[2:] # Remove "0x" prefix
print(f"Applying rule to active window 0x{window_id}: '{window_title}'") info(f"Applying rule to active window 0x{window_id}: '{window_title}'")
success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions) success = self._apply_window_actions(window_id, temp_rule.width, temp_rule.height, temp_rule.actions)
if success: if success:
print("Rule applied successfully") info("Rule applied successfully")
else: else:
print("Failed to apply rule") error("failed to apply rule")
except Exception as e: except Exception as e:
print(f"ERROR: Failed to apply rule to active window: {e}") error(f"failed to apply rule to active window: {e}")
def _find_matching_windows(self, temp_rule: WindowRule) -> list: def _find_matching_windows(self, temp_rule: WindowRule) -> list:
"""Find all windows that match the given rule pattern""" """Find all windows that match the given rule pattern"""
@@ -425,7 +442,7 @@ class Command:
try: try:
matches = bool(re.search(temp_rule.name, window_title)) matches = bool(re.search(temp_rule.name, window_title))
except re.error: except re.error:
print(f"ERROR: Invalid regex pattern '{temp_rule.name}'") warn(f"invalid regex pattern '{temp_rule.name}'")
return [] return []
if matches: if matches:
@@ -434,23 +451,22 @@ class Command:
return matching_windows return matching_windows
except Exception as e: except Exception as e:
print(f"ERROR: Failed to find matching windows: {e}") error(f"failed to find matching windows: {e}")
return [] return []
def _run_daemon(self) -> None: def _run_daemon(self) -> None:
log_message("Hyprland window resizer started") info("Hyprland window resizer started")
log_message(f"Loaded {len(self.window_rules)} window rules") info(f"Loaded {len(self.window_rules)} window rules")
socket_path = Path(hypr.socket2_path) socket_path = Path(hypr.socket2_path)
if not socket_path.exists(): if not socket_path.exists():
log_message(f"ERROR: Hyprland socket not found at {socket_path}") fatal(f"Hyprland socket not found at {socket_path}")
return
try: try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect(hypr.socket2_path) sock.connect(hypr.socket2_path)
log_message("Connected to Hyprland socket, listening for events...") info("Connected to Hyprland socket, listening for events...")
while True: while True:
data = sock.recv(4096).decode() data = sock.recv(4096).decode()
@@ -460,6 +476,6 @@ class Command:
self._handle_window_event(line) self._handle_window_event(line)
except KeyboardInterrupt: except KeyboardInterrupt:
log_message("Resizer daemon stopped") info("Resizer daemon stopped")
except Exception as e: except Exception as e:
log_message(f"ERROR: {e}") error(str(e))
+7 -1
View File
@@ -2,6 +2,7 @@ import subprocess
from argparse import Namespace from argparse import Namespace
from datetime import datetime from datetime import datetime
from caelestia.utils import hypr
from caelestia.utils.notify import notify from caelestia.utils.notify import notify
from caelestia.utils.paths import screenshots_cache_dir, screenshots_dir from caelestia.utils.paths import screenshots_cache_dir, screenshots_dir
@@ -33,7 +34,12 @@ class Command:
swappy.stdin.close() swappy.stdin.close()
def fullscreen(self) -> None: def fullscreen(self) -> None:
sc_data = subprocess.check_output(["grim", "-"]) cmd = ["grim"]
focused_monitor = next(monitor for monitor in hypr.message("monitors") if monitor["focused"])
if focused_monitor:
cmd += ["-o", focused_monitor["name"]]
cmd += ["-"]
sc_data = subprocess.check_output(cmd)
subprocess.run(["wl-copy"], input=sc_data) subprocess.run(["wl-copy"], input=sc_data)
+4 -4
View File
@@ -6,7 +6,7 @@ from collections import ChainMap
from typing import Any, Callable, cast from typing import Any, Callable, cast
from caelestia.utils import hypr from caelestia.utils import hypr
from caelestia.utils.paths import user_config_path from caelestia.utils.paths import get_config
def is_subset(superset, subset): def is_subset(superset, subset):
@@ -103,8 +103,8 @@ class Command:
}, },
} }
try: try:
self.cfg = DeepChainMap(json.loads(user_config_path.read_text())["toggles"], self.cfg) self.cfg = DeepChainMap(get_config()["toggles"], self.cfg)
except (FileNotFoundError, json.JSONDecodeError, KeyError): except KeyError:
pass pass
def run(self) -> None: def run(self) -> None:
@@ -135,7 +135,7 @@ class Command:
if (spawn[0].endswith(".desktop") or shutil.which(spawn[0])) and not any( if (spawn[0].endswith(".desktop") or shutil.which(spawn[0])) and not any(
selector(client) for client in self.get_clients() selector(client) for client in self.get_clients()
): ):
hypr.dispatch("exec", f"[workspace special:{self.args.workspace}] app2unit -- {shlex.join(spawn)}") hypr.dispatch("exec", f"[workspace special:{self.args.workspace}] {shlex.join(spawn)}")
return True return True
else: else:
return False return False
+260
View File
@@ -0,0 +1,260 @@
import sys
from argparse import Namespace
from pathlib import Path
from caelestia.utils.dots.deployer import Deployer
from caelestia.utils.dots.diff import Changeset
from caelestia.utils.dots.manifest import ComponentError, Manifest, ManifestError
from caelestia.utils.dots.misc import build_local_packages, run_hooks
from caelestia.utils.dots.packages import PackageError, PackageInstaller
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.dots.state import DotsState
from caelestia.utils.io import disable_input, fatal, info, log, prompt_selection, warn
class Command:
args: Namespace
def __init__(self, args: Namespace) -> None:
self.args = args
def run(self) -> None:
if self.args.noconfirm:
disable_input()
state = DotsState.load()
if state.applied_rev is None:
fatal("dots not installed yet. Run `caelestia install` first.")
# Run system update
try:
installer = PackageInstaller.get(self.args.aur_helper or state.aur_helper, self.args.noconfirm)
installer.system_update()
except PackageError as e:
fatal(e)
# Get manifest or exit if up to date
source, tip, manifest = self.fetch_manifest(state, state.applied_rev)
# Apply file changes
entries = manifest.enabled_entries()
try:
changeset = Changeset.compute(source, state.applied_rev, tip, entries, state.deployed_files)
source.checkout_tip()
except SourceError as e:
fatal(e)
new_files, revived_files, placed = self.deploy_changeset(source, changeset)
# Persist file changes immediately so a later failure can't lose track of them
deployed = dict(state.deployed_files)
for dest in (*changeset.deletes, *changeset.stale, *changeset.untracked):
deployed.pop(str(dest), None)
for repofile, dest in changeset.remap:
deployed[str(dest)] = repofile
deployed.update(placed)
state.deployed_files = deployed
state.save()
# Install new/remove old packages
desired = manifest.enabled_packages()
desired_local = manifest.enabled_local_packages()
try:
state.packages = self.sync_packages(installer, state.packages, desired)
state.save()
state.local_packages = self.sync_local_packages(installer, source, state.local_packages, desired_local)
state.save()
except PackageError as e:
fatal(e)
# Run hooks
run_hooks(manifest, "post_update")
# Mark the new revision applied
state.applied_rev = tip
state.enabled_components = manifest.enabled_components
state.aur_helper = getattr(installer, "helper", state.aur_helper)
state.save()
self.summarize(changeset, new_files, revived_files)
def fetch_manifest(self, state: DotsState, applied_rev: str) -> tuple[DotsSource, str, Manifest]:
print()
log("Fetching dots repo...")
source = DotsSource()
try:
source.ensure()
tip = source.tip_rev()
if tip == applied_rev:
info("Dots already up to date.")
sys.exit(0)
manifest = source.manifest_at(tip)
if source.has_rev(applied_rev):
known = set(source.manifest_at(applied_rev).components)
else:
# Treat all components as known if rev is invalid so we don't overwrite existing prefs
known = set(manifest.components)
except (SourceError, ManifestError) as e:
fatal(e)
# Enable components recorded at install time + any new components that are default on
enabled = [
name
for name, comp in manifest.components.items()
if name in state.enabled_components or (name not in known and comp.default)
]
# Let the user opt into any new optional components
new_comps = [name for name, comp in manifest.components.items() if name not in known and not comp.default]
if new_comps:
info(f"New components: {', '.join(new_comps)}")
enabled += prompt_selection(new_comps, "Components to enable?")
disabled = [name for name in manifest.components if name not in enabled]
try:
manifest.resolve_components(enable=enabled, disable=disabled)
except ComponentError as e:
fatal(e)
info(f"Enabled components: {', '.join(enabled) or 'none'}")
return source, tip, manifest
def deploy_changeset(
self, source: DotsSource, changeset: Changeset
) -> tuple[list[Path], list[Path], dict[str, str]]:
print()
if changeset.is_empty():
info("No configs to update.")
return [], [], {}
log("Updating configs...")
deployer = Deployer()
for repofile, dest in changeset.place:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
deployer.place_file(src, dest)
info(f"{repofile} -> {dest}")
new_files = []
for repofile, dest in changeset.conflicts:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
new_files.append(new_path)
warn(f"{dest} has local changes; upstream version written as {new_path.name}")
revived_files = []
for repofile, dest in changeset.deleted_changed:
src = source.working_path(repofile)
if not src.exists():
warn(f"missing in source, skipping: {repofile}")
continue
new_path = deployer.write_new(src, dest)
revived_files.append(new_path)
warn(f"{dest} was removed but changed upstream; upstream version written as {new_path.name}")
for dest in changeset.deletes:
deployer.remove(dest)
deployer.prune_empty_dirs(dest, Path.home())
info(f"Removed {dest}")
return new_files, revived_files, deployer.deployed_files
def sync_packages(self, installer: PackageInstaller, current: list[str], desired: list[str]) -> list[str]:
to_install = [p for p in desired if p not in current]
to_remove = [p for p in current if p not in desired]
installed = list(current)
if to_install:
print()
info(f"Installing new packages: {', '.join(to_install)}")
installer.install(to_install)
installed.extend(p for p in to_install if p not in installed)
if to_remove:
print()
info(f"Packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Packages to remove?")
if selected:
installer.remove(selected)
installed = [p for p in installed if p not in selected]
return installed
def sync_local_packages(
self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str]
) -> dict[str, list[str]]:
to_build = [p for p in desired if p not in current]
to_rebuild = self.outdated_local_packages(installer, source, current, desired)
to_remove = [p for p in current if p not in desired]
installed = dict(current)
if to_build:
print()
log(f"Building new local packages: {', '.join(to_build)}")
installed.update(build_local_packages(installer, source, to_build))
if to_rebuild:
print()
log(f"Rebuilding updated local packages: {', '.join(to_rebuild)}")
installed.update(build_local_packages(installer, source, to_rebuild))
if to_remove:
print()
info(f"Local packages no longer required: {', '.join(to_remove)}")
selected = prompt_selection(to_remove, "Local packages to remove?")
if selected:
installer.remove([pkg for path in selected for pkg in current[path]])
for path in selected:
installed.pop(path, None)
return installed
def outdated_local_packages(
self, installer: PackageInstaller, source: DotsSource, current: dict[str, list[str]], desired: list[str]
) -> list[str]:
"""Repo paths whose installed packages are older than what the repo would build (skipped when off Arch)."""
outdated = []
for path in desired:
if path not in current:
continue
directory = source.working_path(path)
if not directory.is_dir():
continue
try:
if installer.needs_rebuild(directory, current[path]):
outdated.append(path)
except PackageError as e:
# Failed to read PKGBUILD, leave it as-is
warn(f"could not check {path} for updates, leaving as-is: {e}")
return outdated
def summarize(self, changeset: Changeset, new_files: list[Path], revived_files: list[Path]) -> None:
print()
conflicts = len(new_files) + len(revived_files)
info(f"Updated {len(changeset.place)} file(s), removed {len(changeset.deletes)}, {conflicts} conflict(s).")
if new_files:
info("The following files were changed upstream but you had edited them locally.")
info("Your versions were kept; the upstream versions were written alongside as .new:")
for path in new_files:
info(f" {path}")
if revived_files:
info("These files were removed by you but changed upstream, so were not restored.")
info("The upstream versions were written alongside as .new:")
for path in revived_files:
info(f" {path}")
if changeset.stale:
info("These files are no longer managed but differ from what was installed, so were kept:")
for path in changeset.stale:
info(f" {path}")
+84
View File
@@ -0,0 +1,84 @@
import shutil
import tempfile
from pathlib import Path
from caelestia.utils.paths import cache_dir, config_dir, data_dir, dots_dir, state_dir
# Dirs to never prune even if empty
_PROTECTED_DIRS = frozenset({Path.home(), config_dir, data_dir, state_dir, cache_dir})
class Deployer:
"""Places files from the dots clone into their destinations."""
def __init__(self):
self.deployed_files: dict[str, str] = {}
def place(self, src: Path, dest: Path) -> None:
"""Place a whole entry (file or directory tree), replacing any existing dest."""
if src.is_dir():
self.place_dir(src, dest)
else:
self.place_file(src, dest)
def place_dir(self, src: Path, dest: Path) -> None:
"""Place a directory tree recursively, overwriting any existing dest files."""
if dest.is_symlink() or dest.is_file():
self.remove(dest)
dest.mkdir(parents=True, exist_ok=True)
for path in src.rglob("*"):
if path.is_file():
self.place_file(path, dest / path.relative_to(src))
elif path.is_dir():
(dest / path.relative_to(src)).mkdir(parents=True, exist_ok=True)
def place_file(self, src: Path, dest: Path, record: bool = True) -> None:
"""Atomically place a single file, replacing any existing dest."""
if dest.is_dir() and not dest.is_symlink():
self.remove(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
f = tempfile.NamedTemporaryFile(dir=dest.parent, delete=False)
f.close()
try:
shutil.copyfile(src, f.name)
shutil.copymode(src, f.name)
Path(f.name).replace(dest)
except BaseException:
Path(f.name).unlink()
raise
if record:
# Keep relative to dots dir
self.deployed_files[str(dest)] = str(src.relative_to(dots_dir))
def write_new(self, src: Path, dest: Path) -> Path:
"""Write the upstream version alongside dest as <dest>.new and return that path."""
new_path = dest.parent / f"{dest.name}.new"
self.place_file(src, new_path, record=False)
return new_path
def remove(self, path: Path) -> None:
if path.is_symlink() or path.is_file():
path.unlink()
elif path.is_dir():
shutil.rmtree(path)
def prune_empty_dirs(self, start: Path, stop: Path) -> None:
"""Removes dirs recursively from start to stop.
Will never prune protected dirs (home, config, cache, etc).
"""
parent = start.parent
while parent != stop and stop in parent.parents and parent not in _PROTECTED_DIRS:
try:
parent.rmdir()
except OSError:
break
parent = parent.parent
+153
View File
@@ -0,0 +1,153 @@
from dataclasses import dataclass, field
from pathlib import Path
from caelestia.utils.dots.manifest import ManifestEntry
from caelestia.utils.dots.source import DotsSource, SourceError
from caelestia.utils.io import warn
class _Continue(Exception):
"""Signals the deployed-files loop to skip to the next entry."""
def _read_local(path: Path) -> bytes | None:
"""Read a local file, returning None if it can't be read (perms, is a dir, etc.)."""
try:
return path.read_bytes()
except OSError:
return None
@dataclass(frozen=True)
class Changeset:
place: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) to fast-forward
conflicts: list[tuple[str, Path]] = field(default_factory=list) # (repofile, dest) -> write .new
deletes: list[Path] = field(default_factory=list) # We placed it, upstream removed it, unmodified
stale: list[Path] = field(default_factory=list) # Upstream removed it but user modified it
deleted_changed: list[tuple[str, Path]] = field(default_factory=list) # User deleted it, upstream changed -> .new
untracked: list[Path] = field(default_factory=list) # Gone + no longer managed; drop from state
remap: list[tuple[str, Path]] = field(default_factory=list) # Up to date but source path moved; restate mapping
def is_empty(self) -> bool:
return not (self.place or self.conflicts or self.deletes or self.stale or self.deleted_changed)
@staticmethod
def compute(
source: DotsSource,
applied_rev: str,
tip: str,
entries: list[ManifestEntry],
deployed: dict[str, str],
) -> "Changeset":
"""Collect all file changes needed into a Changeset."""
has_base = source.has_rev(applied_rev)
if not has_base:
warn(
"the previously applied revision is missing from the dots clone; files that differ "
"from the latest version will be written as .new instead of updated in place."
)
changed = set(source.changed_files(applied_rev, tip)) if has_base else set()
place: list[tuple[str, Path]] = []
conflicts: list[tuple[str, Path]] = []
deletes: list[Path] = []
stale: list[Path] = []
deleted_changed: list[tuple[str, Path]] = []
untracked: list[Path] = []
remap: list[tuple[str, Path]] = []
# Collect all files to deploy (entry sources can be dirs so we recurse into them)
to_deploy: dict[Path, str] = {}
for entry in entries:
src_root = str(entry.expanded_src())
repo_files = source.files_at(tip, src_root)
for dest in entry.expanded_dests():
for repo_file in repo_files:
to_deploy[dest / Path(repo_file).relative_to(src_root)] = repo_file
files_to_deploy = set(to_deploy)
# Already deployed files
for dest, src in deployed.items():
dest_path = Path(dest)
def try_read(rev: str, path: str) -> bytes:
try:
return source.blob_at(rev, path)
except SourceError:
# Read failed, keep it just in case
stale.append(dest_path)
raise _Continue
try:
if dest_path not in files_to_deploy: # No longer managed by any entry
if not dest_path.exists():
# Gone from disk and no entry manages it
untracked.append(dest_path)
continue
local = _read_local(dest_path)
if local is not None and has_base and try_read(applied_rev, src) == local:
deletes.append(dest_path)
else:
# Modified, or unreadable so we can't verify; keep it just in case
stale.append(dest_path)
else: # Still managed; `src` is what we last placed, `new_src` the current source
new_src = to_deploy[dest_path]
if not dest_path.exists():
# User deleted a managed file locally
if has_base and new_src == src and new_src not in changed:
continue # Respect the deletion; upstream has nothing new to offer
# Upstream changed it (or base is unknown): surface as .new, don't restore
deleted_changed.append((new_src, dest_path))
continue
if has_base and new_src == src and new_src not in changed:
continue # Unchanged upstream
dest_content = _read_local(dest_path)
if dest_content is None:
# Unreadable (perms, became a dir, ...); surface upstream as .new, don't clobber
conflicts.append((new_src, dest_path))
continue
if try_read(tip, new_src) == dest_content:
# Already up to date; restate the mapping if the source path moved
if new_src != src:
remap.append((new_src, dest_path))
continue
# Fast-forward only when the user hasn't edited since last deploy
if has_base and try_read(applied_rev, src) == dest_content:
place.append((new_src, dest_path))
else:
conflicts.append((new_src, dest_path))
except _Continue:
continue
# New files to deploy
for dest in files_to_deploy - set(Path(d) for d in deployed):
src = to_deploy[dest]
try:
new_content = source.blob_at(tip, src)
except SourceError:
# Failed to read the upstream blob; skip rather than abort the whole update
warn(f"could not read from source, skipping: {src}")
continue
if not dest.exists() or new_content == _read_local(dest):
# Dest nonexistent or already equal to new content
place.append((src, dest))
else:
# Differs, or exists but unreadable; surface upstream as .new
conflicts.append((src, dest))
return Changeset(
place=place,
conflicts=conflicts,
deletes=deletes,
stale=stale,
deleted_changed=deleted_changed,
untracked=untracked,
remap=remap,
)
+94
View File
@@ -0,0 +1,94 @@
import subprocess
from pathlib import Path
from caelestia.utils.paths import config_dir, data_dir
LEGACY_META_PKG = "caelestia-meta"
_confs = [
"hypr",
"starship.toml",
"foot",
"fish",
"fastfetch",
"uwsm",
"btop",
"spicetify",
"Code/User/settings.json",
"VSCodium/User/settings.json",
"Code/User/keybindings.json",
"VSCodium/User/keybindings.json",
"code-flags.conf",
"codium-flags.conf",
]
def _find_legacy_repo(path: Path) -> Path | None:
try:
remote = subprocess.check_output(["git", "-C", path, "remote", "get-url", "origin"], text=True)
except subprocess.CalledProcessError:
return
# Check remote
if remote.strip() != "https://github.com/caelestia-dots/caelestia.git":
return
# Ignore anything outside home
if Path.home() not in path.parents:
return
# Walk up parents (capped at home) to find the repo root
while path != Path.home() and not (path / ".git").is_dir():
path = path.parent
# Only return path if didn't hit home (we really don't want to nuke home)
if path != Path.home():
return path
def _filter_candidates(candidates: list[Path], legacy_dir: Path) -> list[Path]:
return [path for path in candidates if path.is_symlink() and legacy_dir in path.resolve().parents]
def detect_legacy_repo() -> Path | None:
for conf in _confs:
path = config_dir / conf
if not path.is_symlink():
continue
legacy_dir = _find_legacy_repo(path.resolve())
if legacy_dir:
return legacy_dir
return _find_legacy_repo(data_dir / "caelestia")
def legacy_config_symlinks(base: Path, legacy_dir: Path | None) -> list[Path]:
"""Config-relative links install.fish created, resolved under `base` (the live config or a backup of it)."""
if not legacy_dir:
return []
candidates = [base / conf for conf in _confs]
return _filter_candidates(candidates, legacy_dir)
def legacy_symlinks(legacy_dir: Path | None) -> list[Path]:
"""All paths symlinked into the legacy repo (the links install.fish created)."""
if not legacy_dir:
return []
extras = [
*(Path.home() / ".zen").glob("*/chrome/userChrome.css"),
Path.home() / ".local/lib/caelestia/caelestiafox",
]
return [*legacy_config_symlinks(config_dir, legacy_dir), *_filter_candidates(extras, legacy_dir)]
def legacy_to_delete(legacy_dir: Path | None) -> list[Path]:
if not legacy_dir:
return []
return [*legacy_symlinks(legacy_dir), legacy_dir]
+231
View File
@@ -0,0 +1,231 @@
import glob
import os
import re
import tomllib
from dataclasses import dataclass, field
from pathlib import Path
from string import Template
from typing import Any
from caelestia.utils.io import warn
_XDG_DEFAULTS = {
"XDG_CONFIG_HOME": str(Path.home() / ".config"),
"XDG_DATA_HOME": str(Path.home() / ".local/share"),
"XDG_STATE_HOME": str(Path.home() / ".local/state"),
"XDG_CACHE_HOME": str(Path.home() / ".cache"),
}
_GLOB_MAGIC = re.compile(r"[*?[]")
_LOCAL_PREFIX = "local:"
class ManifestError(Exception):
"""Raised when manifest.toml is malformed."""
class ComponentError(Exception):
"""Raised when component flags are invalid or contradictory."""
def _expand(text: str) -> Path:
"""Expand $VAR/${VAR} env vars (with XDG defaults) and ~ in a path."""
env = {**_XDG_DEFAULTS, **os.environ}
return Path(Template(text).safe_substitute(env)).expanduser()
@dataclass(frozen=True)
class ManifestEntry:
src: str
dest: str
def expanded_src(self) -> Path:
return _expand(self.src)
def expanded_dests(self) -> list[Path]:
"""The dest path with globs expanded.
Globs from the start until the segment with the last glob so subdirs are
created if they didn't exist previously.
"""
expanded = _expand(self.dest)
if not _GLOB_MAGIC.search(str(expanded)):
return [expanded]
parts = expanded.parts
glob_idx = max(i for i, part in enumerate(parts) if _GLOB_MAGIC.search(part))
pattern = str(Path(*parts[: glob_idx + 1]))
tail = parts[glob_idx + 1 :]
matches = sorted(glob.glob(pattern))
if tail: # Only match dirs if a tail exists
matches = [match for match in matches if Path(match).is_dir()]
return [Path(match, *tail) for match in matches]
@dataclass(frozen=True)
class ManifestComponent:
name: str
default: bool = False
packages: list[str] = field(default_factory=list)
entries: list[ManifestEntry] = field(default_factory=list)
post_package: list[str] = field(default_factory=list)
post_install: list[str] = field(default_factory=list)
post_update: list[str] = field(default_factory=list)
@dataclass
class _ManifestData:
enabled_comps: list[str] = field(default_factory=list)
disabled_comps: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class Manifest:
components: dict[str, ManifestComponent] = field(default_factory=dict)
packages: list[str] = field(default_factory=list)
post_package: list[str] = field(default_factory=list) # Post package install (install cmd only)
post_install: list[str] = field(default_factory=list) # Very end of install cmd
post_update: list[str] = field(default_factory=list) # Very end of update cmd
_data: _ManifestData = field(default_factory=_ManifestData, init=False, repr=False)
@property
def enabled_components(self) -> list[str]:
return self._data.enabled_comps
@property
def disabled_components(self) -> list[str]:
return self._data.disabled_comps
@staticmethod
def parse(text: str) -> "Manifest":
try:
raw = tomllib.loads(text)
except tomllib.TOMLDecodeError as e:
raise ManifestError(f"invalid TOML: {e}") from e
post_package = _validate_str_list(raw.get("post_package", []), "post_package")
post_install = _validate_str_list(raw.get("post_install", []), "post_install")
post_update = _validate_str_list(raw.get("post_update", []), "post_update")
packages = _validate_str_list(raw.get("packages", []), "packages")
components = {}
for comp in raw.get("components", []):
parsed = _parse_component(comp)
if parsed.name in components:
warn(f"duplicate component '{parsed.name}'; using the last definition")
components[parsed.name] = parsed
return Manifest(
components=components,
packages=packages,
post_package=post_package,
post_install=post_install,
post_update=post_update,
)
def resolve_components(
self,
enable: list[str] | None = None,
disable: list[str] | None = None,
) -> None:
"""Resolves enabled/disabled components. This MUST be called before calling any other method."""
enable_set = set(enable or [])
disable_set = set(disable or [])
known = set(self.components)
for name in enable_set | disable_set:
if name not in known:
raise ComponentError(f"unknown component: {name}")
conflict = enable_set & disable_set
if conflict:
raise ComponentError(f"component(s) both enabled and disabled: {', '.join(sorted(conflict))}")
enabled = {name for name, comp in self.components.items() if comp.default}
enabled |= enable_set
enabled -= disable_set
self._data.enabled_comps.clear()
self._data.disabled_comps.clear()
for name in self.components:
if name in enabled:
self._data.enabled_comps.append(name)
else:
self._data.disabled_comps.append(name)
def enabled_entries(self) -> list[ManifestEntry]:
"""The entries of every enabled component."""
entries: list[ManifestEntry] = []
for name in self._data.enabled_comps:
entries.extend(self.components[name].entries)
return entries
def enabled_hooks(self, kind: str) -> list[str]:
"""Global + enabled components' hooks of the given kind."""
hooks = list(getattr(self, kind))
for name in self._data.enabled_comps:
hooks.extend(getattr(self.components[name], kind))
return hooks
def enabled_packages(self) -> list[str]:
"""Repo/AUR packages to install."""
return [p for p in self._all_packages() if not p.startswith(_LOCAL_PREFIX)]
def enabled_local_packages(self) -> list[str]:
"""Local PKGBUILD dirs to build.
Local packages are determined by a local: prefix and are
relative dirs instead of package names.
"""
return [p[len(_LOCAL_PREFIX) :] for p in self._all_packages() if p.startswith(_LOCAL_PREFIX)]
def _all_packages(self) -> list[str]:
"""The manifest's top-level packages plus enabled components', in manifest order.
Top-level packages come first, then each enabled component's packages in
component order. Only the first occurrence of each package is kept.
"""
seen: set[str] = set()
ordered: list[str] = []
for pkg in (*self.packages, *(p for c in self._data.enabled_comps for p in self.components[c].packages)):
if pkg not in seen:
seen.add(pkg)
ordered.append(pkg)
return ordered
def _require_key(d: dict[str, Any], key: str, ctx: str) -> Any:
if key not in d:
raise ManifestError(f"{ctx}: missing required key '{key}'")
return d[key]
def _validate_str_list(value: Any, ctx: str) -> list[str]:
if not isinstance(value, list) or not all(isinstance(v, str) for v in value):
raise ManifestError(f"{ctx}: expected a list of strings")
return value
def _parse_entry(d: Any) -> ManifestEntry:
if not isinstance(d, dict):
raise ManifestError("entry: expected a table")
return ManifestEntry(src=_require_key(d, "src", "entry"), dest=_require_key(d, "dest", "entry"))
def _parse_component(d: dict[str, Any]) -> ManifestComponent:
name = _require_key(d, "name", "component")
return ManifestComponent(
name=name,
default=bool(d.get("default", False)),
packages=_validate_str_list(d.get("packages", []), f"component '{name}' packages"),
entries=[_parse_entry(e) for e in d.get("entries", [])],
post_package=_validate_str_list(d.get("post_package", []), f"component '{name}' post_package"),
post_install=_validate_str_list(d.get("post_install", []), f"component '{name}' post_install"),
post_update=_validate_str_list(d.get("post_update", []), f"component '{name}' post_update"),
)
+39
View File
@@ -0,0 +1,39 @@
import os
import subprocess
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.dots.packages import PackageInstaller
from caelestia.utils.dots.source import DotsSource
from caelestia.utils.io import info, log, warn
from caelestia.utils.paths import dots_dir
def build_local_packages(installer: PackageInstaller, source: DotsSource, paths: list[str]) -> dict[str, list[str]]:
"""Build and install each local PKGBUILD dir, returning {path: installed package names}."""
built: dict[str, list[str]] = {}
for path in paths:
directory = source.working_path(path)
if not directory.is_dir():
warn(f"missing in repo, skipping: {path}")
continue
log(f"Building {path}...")
built[path] = installer.build_install(directory)
return built
def run_hooks(manifest: Manifest, kind: str) -> None:
"""Run the global + enabled components' hooks of the given kind (e.g. "post_install")."""
hooks = manifest.enabled_hooks(kind)
if not hooks:
return
print()
log(f"Running {kind.replace('_', '-')} hooks...")
env = {**os.environ, "CAELESTIA_DOTS": str(dots_dir)}
for hook in hooks:
info(f"Running hook: {hook}")
result = subprocess.run(hook, shell=True, env=env)
if result.returncode != 0:
warn(f"hook exited with {result.returncode}")
+260
View File
@@ -0,0 +1,260 @@
import os
import shutil
import subprocess
import tempfile
from abc import ABC, abstractmethod
from pathlib import Path
from caelestia.utils.io import fatal, info, warn
DEFAULT_AUR_HELPER = "paru"
AUR_HELPERS = DEFAULT_AUR_HELPER, "yay"
class PackageError(Exception):
"""Raised when a package operation (install/remove/build/update) fails."""
def _try_run(cmd: list[str], error_msg: str, **kwargs) -> None:
"""Run a subprocess, raising `PackageError` if it fails."""
try:
subprocess.run(cmd, check=True, **kwargs)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PackageError(error_msg) from e
def _read_srcinfo(directory: Path) -> dict[str, list[str]]:
"""Run `makepkg --printsrcinfo` in `directory`, grouping each key to its list of values."""
try:
srcinfo = subprocess.check_output(["makepkg", "--printsrcinfo"], cwd=directory, text=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
raise PackageError(f"failed to read package metadata in {directory}") from e
fields: dict[str, list[str]] = {}
for line in srcinfo.splitlines():
key, sep, value = line.partition("=")
if not sep:
continue
fields.setdefault(key.strip(), []).append(value.strip())
return fields
def _srcinfo_version(fields: dict[str, list[str]]) -> str | None:
"""Build the `[epoch:]pkgver-pkgrel` version string from parsed .SRCINFO fields, or None if absent."""
pkgver = next(iter(fields.get("pkgver", [])), None)
pkgrel = next(iter(fields.get("pkgrel", [])), None)
if pkgver is None or pkgrel is None:
return None
version = f"{pkgver}-{pkgrel}"
epoch = next(iter(fields.get("epoch", [])), None)
return f"{epoch}:{version}" if epoch else version
def _vercmp(a: str, b: str) -> int:
"""Use pacman's `vercmp` to compare to package versions."""
try:
return int(subprocess.check_output(["vercmp", a, b], text=True).strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
warn(f"vercmp failed, assuming equal: {e}")
return 0 # Don't rebuild when unable to check version
def _install_aur_helper(helper: str, noconfirm: bool = False) -> None:
pacman_cmd = ["sudo", "pacman", "-S", "--needed", "git", "base-devel"]
if noconfirm:
pacman_cmd.append("--noconfirm")
_try_run(pacman_cmd, "failed to install AUR helper build dependencies")
repo_url = f"https://aur.archlinux.org/{helper}.git"
with tempfile.TemporaryDirectory() as repo_dir:
_try_run(["git", "clone", repo_url, repo_dir], f"failed to clone {helper} from the AUR")
makepkg_cmd = ["makepkg", "-si"]
if noconfirm:
makepkg_cmd.append("--noconfirm")
_try_run(makepkg_cmd, f"failed to build and install {helper}", cwd=repo_dir)
try:
if helper == "yay":
subprocess.run(["yay", "-Y", "--gendb"], check=True)
subprocess.run(["yay", "-Y", "--devel", "--save"], check=True)
elif helper == "paru":
subprocess.run(["paru", "--gendb"], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
warn(f"failed to run AUR helper post install actions: {e}")
class PackageInstaller(ABC):
@staticmethod
def get(helper: str | None = None, noconfirm: bool = False) -> "PackageInstaller":
"""Pick a package installer: the requested/detected AUR helper on Arch, else a no-op."""
# Not on Arch, can't install packages
if shutil.which("pacman") is None:
return NoopInstaller()
# Explicitly given
if helper:
if not shutil.which(helper):
if helper not in AUR_HELPERS:
fatal(f"given AUR helper {helper} is not installed and is unable to be installed automatically.")
info(f"Given AUR helper not installed. Installing {helper}...")
_install_aur_helper(helper, noconfirm)
return ArchInstaller(helper, noconfirm)
# Not given, find installed one
for candidate in AUR_HELPERS:
if shutil.which(candidate):
return ArchInstaller(candidate, noconfirm)
info(f"No AUR helper found. Installing {DEFAULT_AUR_HELPER}...")
_install_aur_helper(DEFAULT_AUR_HELPER, noconfirm)
return ArchInstaller(DEFAULT_AUR_HELPER, noconfirm)
# --- Abstract methods ---
@abstractmethod
def install(self, packages: list[str]) -> None: ...
@abstractmethod
def remove(self, packages: list[str]) -> None: ...
@abstractmethod
def build_install(self, directory: Path) -> list[str]:
"""Build and install the PKGBUILD in `directory`, returning the installed package names."""
@abstractmethod
def installed_version(self, package: str) -> str | None:
"""Return the installed version of `package`, or None if it is not installed."""
def is_installed(self, package: str) -> bool:
return self.installed_version(package) is not None
@abstractmethod
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
"""Whether the PKGBUILD in `directory` would build a version differing from the installed `packages`."""
@abstractmethod
def system_update(self) -> None: ...
class NoopInstaller(PackageInstaller):
"""Used off Arch, where the dots' packages are not available via pacman/AUR."""
def install(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package install (not on Arch): {', '.join(packages)}")
def remove(self, packages: list[str]) -> None:
if packages:
info(f"Skipping package removal (not on Arch): {', '.join(packages)}")
def build_install(self, directory: Path) -> list[str]:
info(f"Skipping local package build (not on Arch): {directory}")
return []
def installed_version(self, package: str) -> str | None:
return None
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
return False
def system_update(self) -> None:
info("Skipping system update (not on Arch)")
class ArchInstaller(PackageInstaller):
def __init__(self, helper: str, noconfirm: bool = False) -> None:
self.helper = helper
self.flags = ["--noconfirm"] if noconfirm else []
def install(self, packages: list[str], explicit: bool = True) -> None:
if not packages:
return
cmd = [self.helper, "-S", "--needed", *self.flags]
if not explicit:
cmd.append("--asdeps") # Set install reason to dep (does not affect already installed packages)
_try_run(cmd + packages, f"failed to install packages: {', '.join(packages)}")
# Force install reason to explicit install
if explicit:
# `-D` only accepts real installed names, so resolve any virtual/`provides` names (e.g. awk -> gawk)
resolved = [self._installed_name(pkg) for pkg in packages]
try:
subprocess.run([self.helper, "-D", "--asexplicit", *self.flags, *resolved], check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
warn(f"failed to mark packages as explicitly installed: {', '.join(resolved)}")
def remove(self, packages: list[str]) -> None:
if not packages:
return
_try_run([self.helper, "-Rns", *self.flags, *packages], f"failed to remove packages: {', '.join(packages)}")
def build_install(self, directory: Path) -> list[str]:
fields = _read_srcinfo(directory)
names = fields.get("pkgname", [])
depends = fields.get("depends", [])
self.install(depends, explicit=False)
# Stop makepkg from resetting sudo
env = {**os.environ, "PACMAN_AUTH": "sudo"}
# -f = force, -s = sync deps, -i = install
_try_run(
["makepkg", "-fsi", *self.flags], f"failed to build local package in {directory}", cwd=directory, env=env
)
# Clean build artifacts
for artifact in directory.glob("*.pkg.tar*"):
try:
artifact.unlink()
except OSError as e:
warn(f"failed to remove build artifact {artifact}: {e}")
return names
def _query(self, package: str) -> tuple[str, str] | None:
"""Return the installed (name, version) of `package`, resolving `provides` (e.g. awk -> gawk), or None."""
result = subprocess.run(
["pacman", "-Q", package],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
if result.returncode != 0:
return None
# `pacman -Q` resolves provides and prints "<real name> <version>"
parts = result.stdout.split()
return (parts[0], parts[1]) if len(parts) >= 2 else None
def _installed_name(self, package: str) -> str:
"""Resolve `package` to its real installed name (handles provides), falling back to the given name."""
query = self._query(package)
return query[0] if query else package
def installed_version(self, package: str) -> str | None:
query = self._query(package)
return query[1] if query else None
def needs_rebuild(self, directory: Path, packages: list[str]) -> bool:
built = _srcinfo_version(_read_srcinfo(directory))
if built is None:
return False # Can't determine the source version, leave as is
# Rebuild when installed version < repo version
# Don't rebuild packages that have been removed
return any(
(installed := self.installed_version(pkg)) is not None and _vercmp(built, installed) > 0 for pkg in packages
)
def system_update(self) -> None:
_try_run([self.helper, "-Syu", *self.flags], "failed to perform system update")
+123
View File
@@ -0,0 +1,123 @@
import shutil
import subprocess
from pathlib import Path
from caelestia.utils.dots.manifest import Manifest
from caelestia.utils.paths import dots_dir, get_config
class SourceError(Exception):
"""Raised when a git operation against the dots clone fails."""
class DotsSource:
_fetched_source: bool = False
def __init__(self) -> None:
cfg = get_config().get("dots", {})
self.url = cfg.get("url", "https://github.com/caelestia-dots/caelestia.git")
self.branch = cfg.get("branch", "main")
# Cache git blobs by (ref, relpath); objects are immutable for a given rev
self._blob_cache: dict[tuple[str, str], bytes] = {}
@property
def remote_ref(self) -> str:
return f"origin/{self.branch}"
def exists(self) -> bool:
return (dots_dir / ".git").is_dir()
def working_path(self, relpath: str | Path) -> Path:
"""Get a Path relative to the dots dir."""
return dots_dir / relpath
def ensure(self) -> None:
"""Clone the repo if absent, otherwise fetch the latest refs.
If the configured url changed, the stale clone is removed and re-cloned
from the new source.
"""
if self.exists():
if self.current_url() == self.url:
if DotsSource._fetched_source:
return
self._git("fetch", "--prune", "origin", self.branch)
DotsSource._fetched_source = True
return
shutil.rmtree(dots_dir)
dots_dir.parent.mkdir(parents=True, exist_ok=True)
self._run("git", "clone", "--branch", self.branch, self.url, str(dots_dir))
DotsSource._fetched_source = True
def current_url(self) -> str:
return self._git("remote", "get-url", "origin").strip()
def checkout_tip(self) -> str:
"""Reset the working tree to the fetched tip and return its commit hash."""
self._git("reset", "--hard", self.remote_ref)
return self.tip_rev()
def tip_rev(self) -> str:
return self._git("rev-parse", self.remote_ref).strip()
def changed_files(self, base: str, head: str) -> list[str]:
"""Repo-relative paths that differ between two revisions."""
out = self._git("diff", "--name-only", base, head)
return [line for line in out.splitlines() if line]
def has_rev(self, rev: str) -> bool:
"""Whether `rev` resolves to a commit."""
try:
self._git("rev-parse", "--verify", "--quiet", f"{rev}^{{commit}}")
return True
except SourceError:
return False
def clean(self) -> None:
"""Remove all untracked files in the git repo."""
self._git("clean", "-fdx")
# --- Accessors ---
def manifest_at(self, ref: str) -> Manifest:
return Manifest.parse(self.text_at(ref, "manifest.toml"))
def text_at(self, ref: str, relpath: str) -> str:
return self._git("show", f"{ref}:{relpath}")
def blob_at(self, ref: str, relpath: str) -> bytes:
key = (ref, relpath)
if key not in self._blob_cache:
self._blob_cache[key] = self._git_bytes("show", f"{ref}:{relpath}")
return self._blob_cache[key]
def files_at(self, ref: str, relpath: str) -> list[str]:
"""Repo-relative paths of all files under relpath at ref (the path itself if it is a file)."""
out = self._git("ls-tree", "-r", "--name-only", ref, "--", relpath)
return [line for line in out.splitlines() if line]
# --- Helpers ---
def _git(self, *args: str) -> str:
# core.quotePath=false so non-ASCII paths come back verbatim, not octal-escaped
return self._run("git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args)
def _git_bytes(self, *args: str) -> bytes:
cmd = ["git", "-C", str(dots_dir), "-c", "core.quotePath=false", *args]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
raise SourceError(result.stderr.decode().strip() or f"git {' '.join(args)} failed")
return result.stdout
def _run(self, *cmd: str) -> str:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise SourceError(result.stderr.strip() or f"{' '.join(cmd)} failed")
return result.stdout
+58
View File
@@ -0,0 +1,58 @@
import json
from dataclasses import dataclass, field
from caelestia.utils.dots.packages import DEFAULT_AUR_HELPER
from caelestia.utils.io import warn
from caelestia.utils.paths import atomic_dump, dots_state_path
@dataclass
class DotsState:
# The AUR helper selected selected at install time
aur_helper: str = "paru"
# The git rev of currently applied dots version
applied_rev: str | None = None
# The currently enabled components
enabled_components: list[str] = field(default_factory=list)
# Previously installed packages/local packages
packages: list[str] = field(default_factory=list)
local_packages: dict[str, list[str]] = field(default_factory=dict)
# Files placed by the last deploy. Only files, not directories
# Maps dest -> src
deployed_files: dict[str, str] = field(default_factory=dict)
@staticmethod
def load() -> "DotsState":
try:
data = json.loads(dots_state_path.read_text())
except FileNotFoundError:
return DotsState()
except json.JSONDecodeError:
warn("failed to parse current dots state.")
return DotsState()
return DotsState(
aur_helper=data.get("aur_helper", DEFAULT_AUR_HELPER),
applied_rev=data.get("applied_rev"),
enabled_components=data.get("enabled_components", []),
packages=data.get("packages", []),
local_packages=data.get("local_packages", {}),
deployed_files=data.get("deployed_files", {}),
)
def save(self) -> None:
atomic_dump(
dots_state_path,
{
"aur_helper": self.aur_helper,
"applied_rev": self.applied_rev,
"enabled_components": self.enabled_components,
"packages": self.packages,
"local_packages": self.local_packages,
"deployed_files": self.deployed_files,
},
)
+30
View File
@@ -7,6 +7,7 @@ socket_base = f"{os.getenv('XDG_RUNTIME_DIR')}/hypr/{os.getenv('HYPRLAND_INSTANC
socket_path = f"{socket_base}/.socket.sock" socket_path = f"{socket_base}/.socket.sock"
socket2_path = f"{socket_base}/.socket2.sock" socket2_path = f"{socket_base}/.socket2.sock"
_lua_config_cache: bool | None = None
def message(msg: str, is_json: bool = True) -> str | dict[str, Any]: def message(msg: str, is_json: bool = True) -> str | dict[str, Any]:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
@@ -26,7 +27,36 @@ def message(msg: str, is_json: bool = True) -> str | dict[str, Any]:
return json.loads(resp) if is_json else resp return json.loads(resp) if is_json else resp
def is_lua_config() -> bool:
global _lua_config_cache
if _lua_config_cache is not None:
return _lua_config_cache
try:
result = message("systeminfo", is_json=False)
for line in result.splitlines():
if "configProvider:" in line:
_lua_config_cache = "lua" in line.lower()
return _lua_config_cache
_lua_config_cache = False
return False
except Exception:
_lua_config_cache = False
return False
DISPATCHER_MAP_LUA = {
"togglespecialworkspace": lambda *a: f'hl.dsp.workspace.toggle_special("{a[0]}")' if a else 'hl.dsp.workspace.toggle_special()',
"movetoworkspacesilent": lambda *a: (
f'hl.dsp.window.move({{window = "address:{a[0].split(",")[1].replace("address:", "")}", workspace = "{a[0].split(",")[0]}", follow = false}})'
),
"exec": lambda *a: 'hl.dsp.exec_cmd("' + ' '.join(a).replace('\\', '\\\\').replace('"', '\\"') + '")',
}
def dispatch(dispatcher: str, *args: str) -> bool: def dispatch(dispatcher: str, *args: str) -> bool:
if is_lua_config() and dispatcher in DISPATCHER_MAP_LUA:
lua_dispatch = DISPATCHER_MAP_LUA[dispatcher](*args)
return message(f"dispatch {lua_dispatch}", is_json=False) == "ok"
return message(f"dispatch {dispatcher} {' '.join(map(str, args))}".rstrip(), is_json=False) == "ok" return message(f"dispatch {dispatcher} {' '.join(map(str, args))}".rstrip(), is_json=False) == "ok"
+139
View File
@@ -0,0 +1,139 @@
import sys
from typing import Never
LOG_COLOUR: int = 2
INFO_COLOUR: int = 0
PROMPT_COLOUR: int = 36
WARNING_COLOUR: int = 33
ERROR_COLOUR: int = 31
_disable_input: bool = False
def disable_input() -> None:
global _disable_input
_disable_input = True
def log_exception(func):
"""Log exceptions to stderr instead of raising.
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
error(f'exception during "{func.__name__}()": {str(e)}')
return wrapper
def format_msg(colour: int, prefix: bool, msg: str) -> str:
return f"\033[{colour}m{':: ' if prefix else ''}{msg}\033[0m"
def log(msg: str, prefix: bool = True) -> None:
print(format_msg(LOG_COLOUR, prefix, msg))
def info(msg: str, prefix: bool = True) -> None:
print(format_msg(INFO_COLOUR, prefix, msg))
def warn(msg: str, prefix: bool = True) -> None:
print(format_msg(WARNING_COLOUR, prefix, f"Warning: {msg}"))
def error(err: str | Exception, prefix: bool = True) -> None:
print(format_msg(ERROR_COLOUR, prefix, f"Error: {err}"), file=sys.stderr)
def fatal(err: str | Exception, prefix: bool = True) -> Never:
print(format_msg(ERROR_COLOUR, prefix, f"Fatal: {err}"), file=sys.stderr)
sys.exit(1)
def _input(prompt: str) -> str:
if _disable_input:
print(prompt, end="")
return ""
try:
return input(prompt)
except (KeyboardInterrupt, EOFError):
print()
raise KeyboardInterrupt()
def prompt(msg: str, prefix: bool = True, end: str = " ") -> str:
return _input(format_msg(PROMPT_COLOUR, prefix, msg) + end)
def confirm(msg: str, prefix: bool = True, default: bool = True) -> bool:
suffix = " [Y/n]" if default else " [y/N]"
answer = prompt(msg + suffix, prefix=prefix).strip().lower()
if not answer:
return default
return answer in ("y", "yes")
def prompt_selection(items: list[str], header: str) -> list[str]:
"""Prompt the user to pick from a numbered list, returning the selected items.
Accepts `[A]ll`/`a`, single indices, ranges (`1-3`) and exclusions (`^4`).
Empty input selects nothing. Re-prompts until the input parses.
"""
print(format_msg(PROMPT_COLOUR, True, header))
max_idx_w = len(str(len(items)))
for i, item in enumerate(items):
print(format_msg(PROMPT_COLOUR, True, f" {i + 1:<{max_idx_w}}\t{item}"))
print(format_msg(PROMPT_COLOUR, True, "[A]ll or (1 2 3, 1-3, ^4)"))
def valid_idx(v: str) -> int:
try:
idx = int(v, base=10) - 1 # -1 to translate to 0 index
except ValueError:
raise ValueError(f'Given value "{v}" must be an integer.')
if idx < 0 or idx >= len(items):
raise ValueError(f'Given value "{v}" must be between 1 and {len(items)} inclusive.')
return idx
def parse(ans: str) -> list[str]:
if ans in ("a", "all"):
return list(items)
if not ans:
return []
selected: list[str] = []
for tok in ans.split():
fr, sep, to = tok.partition("-")
if sep:
lo, hi = valid_idx(fr), valid_idx(to)
if lo > hi:
raise ValueError(f'Given range "{tok}" must be lo-hi.')
selected += items[lo : hi + 1]
elif tok.startswith("^"):
t = valid_idx(tok[1:])
selected += items[:t] + items[t + 1 :]
else:
selected.append(items[valid_idx(tok)])
return list(set(selected))
while True:
ans = prompt("", end="").lower().strip()
try:
return parse(ans)
except ValueError as e:
warn(f"invalid input. {e} Please try again.")
def pause() -> None:
if _disable_input:
return
_input("\n\033[2m\033[3m(Ctrl+C to exit, enter to continue)\033[0m")
print("\033[1A\r\033[2K\033[1A\r\033[2K", end="") # Clear pause prompt
-22
View File
@@ -1,22 +0,0 @@
from time import strftime
def log_message(message: str) -> None:
timestamp = strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {message}")
def log_exception(func):
"""Log exceptions to stdout instead of raising
Used by the `apply_()` functions so that an exception, when applying
a theme, does not prevent the other themes from being applied.
"""
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
log_message(f'Error during execution of "{func.__name__}()": {str(e)}')
return wrapper
+1 -1
View File
@@ -18,7 +18,7 @@ from typing import Protocol, Any
# subclasses in get_scheme() handle that internally. This Protocol tells the type # subclasses in get_scheme() handle that internally. This Protocol tells the type
# checker to expect our specific 3-argument setup instead of the base class signature. # checker to expect our specific 3-argument setup instead of the base class signature.
class SchemeConstructor(Protocol): class SchemeConstructor(Protocol):
def __call__(self, source_color_hct: Any, is_dark: bool, contrast_level: float) -> DynamicScheme: ... def __call__(self, source_color_hct: Any, is_dark: bool, contrast_level: float) -> "DynamicScheme": ...
try: try:
from materialyoucolor.dynamiccolor.dynamic_scheme import DynamicScheme from materialyoucolor.dynamiccolor.dynamic_scheme import DynamicScheme
+31 -5
View File
@@ -1,11 +1,12 @@
import hashlib import hashlib
import json import json
import os import os
import shutil
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from caelestia.utils.io import warn
config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) config_dir: Path = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config"))
data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share")) data_dir: Path = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local/share"))
state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state")) state_dir: Path = Path(os.getenv("XDG_STATE_HOME", Path.home() / ".local/state"))
@@ -24,6 +25,10 @@ templates_dir: Path = cli_data_dir / "templates"
user_templates_dir: Path = c_config_dir / "templates" user_templates_dir: Path = c_config_dir / "templates"
theme_dir: Path = c_state_dir / "theme" theme_dir: Path = c_state_dir / "theme"
config_backup_dir: Path = config_dir.parent / f"{config_dir.name}.bak"
dots_dir: Path = c_state_dir / "dots"
dots_state_path: Path = c_state_dir / "dots-state.json"
scheme_path: Path = c_state_dir / "scheme.json" scheme_path: Path = c_state_dir / "scheme.json"
scheme_data_dir: Path = cli_data_dir / "schemes" scheme_data_dir: Path = cli_data_dir / "schemes"
scheme_cache_dir: Path = c_cache_dir / "schemes" scheme_cache_dir: Path = c_cache_dir / "schemes"
@@ -52,8 +57,29 @@ def compute_hash(path: Path | str) -> str:
return sha.hexdigest() return sha.hexdigest()
def atomic_dump(path: Path, content: dict[str, Any]) -> None: def atomic_write(path: Path, content: str) -> None:
with tempfile.NamedTemporaryFile("w") as f: path.parent.mkdir(parents=True, exist_ok=True)
json.dump(content, f) f = tempfile.NamedTemporaryFile("w", dir=path.parent, delete=False)
try:
with f:
f.write(content)
f.flush() f.flush()
shutil.move(f.name, path) os.fsync(f.fileno())
os.replace(f.name, path)
except BaseException:
os.unlink(f.name)
raise
def atomic_dump(path: Path, content: dict[str, Any]) -> None:
atomic_write(path, json.dumps(content))
def get_config() -> dict[str, Any]:
try:
return json.loads(user_config_path.read_text())
except json.JSONDecodeError:
warn("failed to parse config, invalid JSON")
except FileNotFoundError:
pass
return {}
+59 -40
View File
@@ -1,5 +1,6 @@
import fcntl import fcntl
import json import json
import os
import re import re
import shutil import shutil
import subprocess import subprocess
@@ -7,16 +8,19 @@ import tempfile
from pathlib import Path from pathlib import Path
from caelestia.utils.colour import get_dynamic_colours from caelestia.utils.colour import get_dynamic_colours
from caelestia.utils.logging import log_exception from caelestia.utils.hypr import is_lua_config
from caelestia.utils.io import log_exception
from caelestia.utils.paths import ( from caelestia.utils.paths import (
atomic_write,
c_state_dir, c_state_dir,
config_dir, config_dir,
data_dir, data_dir,
get_config,
templates_dir, templates_dir,
theme_dir, theme_dir,
user_config_path,
user_templates_dir, user_templates_dir,
) )
from caelestia.utils.scheme import get_scheme
def gen_conf(colours: dict[str, str]) -> str: def gen_conf(colours: dict[str, str]) -> str:
@@ -26,6 +30,14 @@ def gen_conf(colours: dict[str, str]) -> str:
return conf return conf
def gen_lua(colours: dict[str, str]) -> str:
lua = "return {\n"
for name, colour in colours.items():
lua += f' {name} = "{colour}",\n'
lua += "}"
return lua
def gen_scss(colours: dict[str, str]) -> str: def gen_scss(colours: dict[str, str]) -> str:
scss = "" scss = ""
for name, colour in colours.items(): for name, colour in colours.items():
@@ -108,15 +120,6 @@ def gen_sequences(colours: dict[str, str]) -> str:
) )
def write_file(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w") as f:
f.write(content)
f.flush()
shutil.move(f.name, path)
@log_exception @log_exception
def apply_terms(sequences: str) -> None: def apply_terms(sequences: str) -> None:
state = c_state_dir / "sequences.txt" state = c_state_dir / "sequences.txt"
@@ -142,57 +145,56 @@ def apply_terms(sequences: str) -> None:
@log_exception @log_exception
def apply_hypr(conf: str) -> None: def apply_hypr(conf: str) -> None:
write_file(config_dir / "hypr/scheme/current.conf", conf) ext = "lua" if is_lua_config() else "conf"
atomic_write(config_dir / f"hypr/scheme/current.{ext}", conf)
@log_exception @log_exception
def apply_discord(scss: str) -> None: def apply_discord(scss: str) -> None:
import tempfile
with tempfile.TemporaryDirectory("w") as tmp_dir: with tempfile.TemporaryDirectory("w") as tmp_dir:
(Path(tmp_dir) / "_colours.scss").write_text(scss) (Path(tmp_dir) / "_colours.scss").write_text(scss)
conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True) conf = subprocess.check_output(["sass", "-I", tmp_dir, templates_dir / "discord.scss"], text=True)
for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord": for client in "Equicord", "Vencord", "BetterDiscord", "equibop", "vesktop", "legcord":
write_file(config_dir / client / "themes/caelestia.theme.css", conf) atomic_write(config_dir / client / "themes/caelestia.theme.css", conf)
@log_exception @log_exception
def apply_pandora(colours: dict[str, str], mode: str) -> None: def apply_pandora(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "pandora.json", hash=True) template = gen_replace(colours, templates_dir / "pandora.json", hash=True)
template = template.replace("{{ $mode }}", mode) template = template.replace("{{ $mode }}", mode)
write_file(data_dir / "PandoraLauncher/themes/caelestia.json", template) atomic_write(data_dir / "PandoraLauncher/themes/caelestia.json", template)
@log_exception @log_exception
def apply_spicetify(colours: dict[str, str], mode: str) -> None: def apply_spicetify(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini") template = gen_replace(colours, templates_dir / f"spicetify-{mode}.ini")
write_file(config_dir / "spicetify/Themes/caelestia/color.ini", template) atomic_write(config_dir / "spicetify/Themes/caelestia/color.ini", template)
@log_exception @log_exception
def apply_fuzzel(colours: dict[str, str]) -> None: def apply_fuzzel(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "fuzzel.ini") template = gen_replace(colours, templates_dir / "fuzzel.ini")
write_file(config_dir / "fuzzel/fuzzel.ini", template) atomic_write(config_dir / "fuzzel/fuzzel.ini", template)
@log_exception @log_exception
def apply_btop(colours: dict[str, str]) -> None: def apply_btop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "btop.theme", hash=True) template = gen_replace(colours, templates_dir / "btop.theme", hash=True)
write_file(config_dir / "btop/themes/caelestia.theme", template) atomic_write(config_dir / "btop/themes/caelestia.theme", template)
subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "btop"], stderr=subprocess.DEVNULL)
@log_exception @log_exception
def apply_nvtop(colours: dict[str, str]) -> None: def apply_nvtop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True) template = gen_replace(colours, templates_dir / "nvtop.colors", hash=True)
write_file(config_dir / "nvtop/nvtop.colors", template) atomic_write(config_dir / "nvtop/nvtop.colors", template)
@log_exception @log_exception
def apply_htop(colours: dict[str, str]) -> None: def apply_htop(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "htop.theme", hash=True) template = gen_replace(colours, templates_dir / "htop.theme", hash=True)
write_file(config_dir / "htop/htoprc", template) atomic_write(config_dir / "htop/htoprc", template)
subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "htop"], stderr=subprocess.DEVNULL)
@@ -302,30 +304,33 @@ def _determine_hue_color(r: int, g: int, b: int, brightness: int, use_pale: bool
@log_exception @log_exception
def apply_gtk(colours: dict[str, str], mode: str) -> None: def apply_gtk(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None:
gtk_template = gen_replace(colours, templates_dir / "gtk.css", hash=True) gtk_template = gen_replace(colours, templates_dir / "gtk.css", hash=True)
thunar_template = gen_replace(colours, templates_dir / "thunar.css", hash=True) thunar_template = gen_replace(colours, templates_dir / "thunar.css", hash=True)
for gtk_version in ["gtk-3.0", "gtk-4.0"]: for gtk_version in ["gtk-3.0", "gtk-4.0"]:
gtk_config_dir = config_dir / gtk_version gtk_config_dir = config_dir / gtk_version
write_file(gtk_config_dir / "gtk.css", gtk_template) atomic_write(gtk_config_dir / "gtk.css", gtk_template)
write_file(gtk_config_dir / "thunar.css", thunar_template) atomic_write(gtk_config_dir / "thunar.css", thunar_template)
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"]) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/gtk-theme", "'adw-gtk3-dark'"])
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"]) subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/color-scheme", f"'prefer-{mode}'"])
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/icon-theme", f"'Papirus-{mode.capitalize()}'"]) gtk_icon_theme = icon_theme if icon_theme is not None else f"Papirus-{mode.capitalize()}"
subprocess.run(["dconf", "write", "/org/gnome/desktop/interface/icon-theme", f"'{gtk_icon_theme}'"])
sync_papirus_colors(colours["primary"]) sync_papirus_colors(colours["primary"])
@log_exception @log_exception
def apply_qt(colours: dict[str, str], mode: str) -> None: def apply_qt(colours: dict[str, str], mode: str, icon_theme: str | None = None) -> None:
colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True) colours = gen_replace(colours, templates_dir / f"qt{mode}.colors", hash=True)
write_file(config_dir / "qtengine/caelestia.colors", colours) atomic_write(config_dir / "qtengine/caelestia.colors", colours)
config = (templates_dir / "qtengine.json").read_text() config = (templates_dir / "qtengine.json").read_text()
config = config.replace("{{ $mode }}", mode.capitalize()) config = config.replace("{{ $mode }}", mode.capitalize())
write_file(config_dir / "qtengine/config.json", config) if icon_theme is not None:
config = config.replace(f'"iconTheme": "Papirus-{mode.capitalize()}"', f'"iconTheme": "{icon_theme}"')
atomic_write(config_dir / "qtengine/config.json", config)
@log_exception @log_exception
@@ -334,7 +339,7 @@ def apply_warp(colours: dict[str, str], mode: str) -> None:
template = gen_replace(colours, templates_dir / "warp.yaml", hash=True) template = gen_replace(colours, templates_dir / "warp.yaml", hash=True)
template = template.replace("{{ $warp_mode }}", warp_mode) template = template.replace("{{ $warp_mode }}", warp_mode)
write_file(data_dir / "warp-terminal/themes/caelestia.yaml", template) atomic_write(data_dir / "warp-terminal/themes/caelestia.yaml", template)
@log_exception @log_exception
@@ -356,7 +361,7 @@ def apply_chromium(colours: dict[str, str]) -> None:
print(f"Unable to create {policy_dir} directory") print(f"Unable to create {policy_dir} directory")
continue continue
# Use tee instead of write_file cause we need sudo # Use tee instead of atomic_write cause we need sudo
subprocess.run( subprocess.run(
["sudo", "-n", "tee", str(policy_dir / "caelestia.json")], ["sudo", "-n", "tee", str(policy_dir / "caelestia.json")],
input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}), input=json.dumps({"BrowserThemeColor": theme_color, "BrowserColorScheme": "device"}),
@@ -379,13 +384,13 @@ def apply_zed(colours: dict[str, str], mode: str) -> None:
theme_path.unlink() theme_path.unlink()
content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode) content = gen_replace_dynamic(colours, templates_dir / "zed.json", mode)
write_file(theme_path, content) atomic_write(theme_path, content)
@log_exception @log_exception
def apply_cava(colours: dict[str, str]) -> None: def apply_cava(colours: dict[str, str]) -> None:
template = gen_replace(colours, templates_dir / "cava.conf", hash=True) template = gen_replace(colours, templates_dir / "cava.conf", hash=True)
write_file(config_dir / "cava/config", template) atomic_write(config_dir / "cava/config", template)
subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL) subprocess.run(["killall", "-USR2", "cava"], stderr=subprocess.DEVNULL)
@@ -397,7 +402,7 @@ def apply_user_templates(colours: dict[str, str], mode: str) -> None:
for file in user_templates_dir.iterdir(): for file in user_templates_dir.iterdir():
if file.is_file(): if file.is_file():
content = gen_replace_dynamic(colours, file, mode) content = gen_replace_dynamic(colours, file, mode)
write_file(theme_dir / file.name, content) atomic_write(theme_dir / file.name, content)
def apply_colours(colours: dict[str, str], mode: str) -> None: def apply_colours(colours: dict[str, str], mode: str) -> None:
@@ -412,10 +417,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
except BlockingIOError: except BlockingIOError:
return return
try: cfg = get_config().get("theme", {})
cfg = json.loads(user_config_path.read_text())["theme"]
except (FileNotFoundError, json.JSONDecodeError, KeyError):
cfg = {}
def check(key: str) -> bool: def check(key: str) -> bool:
return cfg[key] if key in cfg else True return cfg[key] if key in cfg else True
@@ -423,7 +425,7 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
if check("enableTerm"): if check("enableTerm"):
apply_terms(gen_sequences(colours)) apply_terms(gen_sequences(colours))
if check("enableHypr"): if check("enableHypr"):
apply_hypr(gen_conf(colours)) apply_hypr(gen_lua(colours) if is_lua_config() else gen_conf(colours))
if check("enableDiscord"): if check("enableDiscord"):
apply_discord(gen_scss(colours)) apply_discord(gen_scss(colours))
if check("enableSpicetify"): if check("enableSpicetify"):
@@ -438,10 +440,11 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
apply_nvtop(colours) apply_nvtop(colours)
if check("enableHtop"): if check("enableHtop"):
apply_htop(colours) apply_htop(colours)
icon_theme = cfg.get(f"iconTheme{mode.capitalize()}") or cfg.get("iconTheme")
if check("enableGtk"): if check("enableGtk"):
apply_gtk(colours, mode) apply_gtk(colours, mode, icon_theme)
if check("enableQt"): if check("enableQt"):
apply_qt(colours, mode) apply_qt(colours, mode, icon_theme)
if check("enableWarp"): if check("enableWarp"):
apply_warp(colours, mode) apply_warp(colours, mode)
if check("enableChromium"): if check("enableChromium"):
@@ -452,6 +455,22 @@ def apply_colours(colours: dict[str, str], mode: str) -> None:
apply_cava(colours) apply_cava(colours)
apply_user_templates(colours, mode) apply_user_templates(colours, mode)
if post_hook := cfg.get("postHook"):
scheme = get_scheme()
subprocess.run(
post_hook,
shell=True,
env={
**os.environ,
"SCHEME_NAME": scheme.name,
"SCHEME_FLAVOUR": scheme.flavour,
"SCHEME_MODE": scheme.mode,
"SCHEME_VARIANT": scheme.variant,
"SCHEME_COLOURS": json.dumps(scheme.colours),
},
stderr=subprocess.DEVNULL,
)
finally: finally:
try: try:
lock_file.unlink() lock_file.unlink()
+13 -8
View File
@@ -2,7 +2,6 @@ import json
import os import os
import random import random
import subprocess import subprocess
from argparse import Namespace from argparse import Namespace
from pathlib import Path from pathlib import Path
from typing import cast from typing import cast
@@ -11,12 +10,12 @@ from materialyoucolor.hct import Hct
from materialyoucolor.utils.color_utils import argb_from_rgb from materialyoucolor.utils.color_utils import argb_from_rgb
from PIL import Image from PIL import Image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.hypr import message from caelestia.utils.hypr import message
from caelestia.utils.material import get_colours_for_image from caelestia.utils.material import get_colours_for_image
from caelestia.utils.colourfulness import get_variant
from caelestia.utils.paths import ( from caelestia.utils.paths import (
compute_hash, compute_hash,
user_config_path, get_config,
wallpaper_link_path, wallpaper_link_path,
wallpaper_path_path, wallpaper_path_path,
wallpaper_thumbnail_path, wallpaper_thumbnail_path,
@@ -186,17 +185,23 @@ def set_wallpaper(wall: Path, no_smart: bool) -> None:
apply_colours(scheme.colours, scheme.mode) apply_colours(scheme.colours, scheme.mode)
# Run custom post-hook if configured # Run custom post-hook if configured
try: cfg = get_config().get("wallpaper", {})
cfg = json.loads(user_config_path.read_text()).get("wallpaper", {})
if post_hook := cfg.get("postHook"): if post_hook := cfg.get("postHook"):
subprocess.run( subprocess.run(
post_hook, post_hook,
shell=True, shell=True,
env={**os.environ, "WALLPAPER_PATH": str(wall)}, env={
**os.environ,
"WALLPAPER_PATH": str(wall),
"SCHEME_NAME": scheme.name,
"SCHEME_FLAVOUR": scheme.flavour,
"SCHEME_MODE": scheme.mode,
"SCHEME_VARIANT": scheme.variant,
"SCHEME_COLOURS": json.dumps(scheme.colours),
"THUMBNAIL_PATH": str(thumb),
},
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
except (FileNotFoundError, json.JSONDecodeError):
pass
def set_random(args: Namespace) -> None: def set_random(args: Namespace) -> None: