record: wl-screenrec -> gpu-screen-recorder

Supports NVIDIA, so no need for having a fallback
Also supports pausing
This commit is contained in:
2 * r + 2 * t
2025-09-13 22:58:57 +10:00
parent 4263e5f809
commit 2eda287a80
4 changed files with 47 additions and 75 deletions
+1
View File
@@ -70,6 +70,7 @@ def parse_args() -> (argparse.ArgumentParser, argparse.Namespace):
record_parser.set_defaults(cls=record.Command)
record_parser.add_argument("-r", "--region", nargs="?", const="slurp", help="record a region")
record_parser.add_argument("-s", "--sound", action="store_true", help="record audio")
record_parser.add_argument("-p", "--pause", action="store_true", help="pause/resume the recording")
# Create parser for clipboard opts
clipboard_parser = command_parser.add_parser("clipboard", help="open clipboard history")
+42 -69
View File
@@ -1,4 +1,5 @@
import json
import re
import shutil
import subprocess
import time
@@ -8,108 +9,80 @@ from datetime import datetime
from caelestia.utils.notify import close_notification, notify
from caelestia.utils.paths import recording_notif_path, recording_path, recordings_dir
RECORDER = "gpu-screen-recorder"
class Command:
args: Namespace
recorder: str
def __init__(self, args: Namespace) -> None:
self.args = args
self.recorder = self._detect_recorder()
def _detect_recorder(self) -> str:
"""Detect which screen recorder to use based on GPU."""
try:
# Check for NVIDIA GPU
lspci_output = subprocess.check_output(["lspci"], text=True)
if "nvidia" in lspci_output.lower():
# Check if wf-recorder is available
if shutil.which("wf-recorder"):
return "wf-recorder"
# Default to wl-screenrec if available
if shutil.which("wl-screenrec"):
return "wl-screenrec"
# Fallback to wf-recorder if wl-screenrec is not available
if shutil.which("wf-recorder"):
return "wf-recorder"
raise RuntimeError("No compatible screen recorder found")
except subprocess.CalledProcessError:
# If lspci fails, default to wl-screenrec
return "wl-screenrec" if shutil.which("wl-screenrec") else "wf-recorder"
def run(self) -> None:
if self.proc_running():
if self.args.pause:
subprocess.run(["pkill", "-USR2", "-f", RECORDER], stdout=subprocess.DEVNULL)
elif self.proc_running():
self.stop()
else:
self.start()
def proc_running(self) -> bool:
return subprocess.run(["pidof", self.recorder], stdout=subprocess.DEVNULL).returncode == 0
return subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL).returncode == 0
def intersects(self, a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool:
return a[0] < b[0] + b[2] and a[0] + a[2] > b[0] and a[1] < b[1] + b[3] and a[1] + a[3] > b[1]
def start(self) -> None:
args = []
args = ["-w"]
monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
if self.args.region:
if self.args.region == "slurp":
region = subprocess.check_output(["slurp"], text=True)
region = subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True)
else:
region = self.args.region
args += ["-g", region.strip()]
region = self.args.region.strip()
args += ["region", "-region", region]
m = re.match(r"(\d+)x(\d+)\+(\d+)\+(\d+)", region)
if not m:
raise ValueError(f"Invalid region: {region}")
w, h, x, y = map(int, m.groups())
r = x, y, w, h
max_rr = 0
for monitor in monitors:
if self.intersects((monitor["x"], monitor["y"], monitor["width"], monitor["height"]), r):
rr = round(monitor["refreshRate"])
max_rr = max(max_rr, rr)
args += ["-f", str(max_rr)]
else:
monitors = json.loads(subprocess.check_output(["hyprctl", "monitors", "-j"]))
focused_monitor = next(monitor for monitor in monitors if monitor["focused"])
if focused_monitor:
args += ["-o", focused_monitor["name"]]
args += [focused_monitor["name"], "-f", str(round(focused_monitor["refreshRate"]))]
if self.args.sound:
sources = subprocess.check_output(["pactl", "list", "short", "sources"], text=True).splitlines()
audio_source = None
for source in sources:
if "RUNNING" in source:
audio_source = source.split()[1]
break
# Fallback to IDLE source if no RUNNING source
if not audio_source:
for source in sources:
if "IDLE" in source:
audio_source = source.split()[1]
break
if not audio_source:
raise ValueError("No audio source found")
if self.recorder == "wf-recorder":
args += [f"--audio={audio_source}"]
else:
args += ["--audio", "--audio-device", audio_source]
args += ["-a", "default_output"]
recording_path.parent.mkdir(parents=True, exist_ok=True)
proc = subprocess.Popen(
[self.recorder, *args, "-f", recording_path],
stderr=subprocess.PIPE,
text=True,
start_new_session=True,
)
proc = subprocess.Popen([RECORDER, *args, "-o", str(recording_path)], start_new_session=True)
notif = notify("-p", "Recording started", "Recording...")
recording_notif_path.write_text(notif)
for _ in range(5):
if proc.poll() is not None:
if proc.returncode != 0:
close_notification(notif)
notify("Recording failed", f"Recording error: {proc.communicate()[1]}")
return
time.sleep(0.2)
try:
if proc.wait(1) != 0:
close_notification(notif)
notify(
"Recording failed",
"An error occurred attempting to start recorder. "
f"Command `{' '.join(proc.args)}` failed with exit code {proc.returncode}",
)
except subprocess.TimeoutExpired:
pass
def stop(self) -> None:
# Start killing recording process
subprocess.run(["pkill", self.recorder])
subprocess.run(["pkill", "-f", RECORDER], stdout=subprocess.DEVNULL)
# Wait for recording to finish to avoid corrupted video file
while self.proc_running():