🧱 massive revamp

This commit is contained in:
sakamoto
2024-08-05 07:17:19 -05:00
parent a3f1d318b9
commit 1f892d926a
2 changed files with 165 additions and 225 deletions
-1
View File
@@ -3,6 +3,5 @@
img2pdf==0.4.4 img2pdf==0.4.4
numpy==2.0.1 numpy==2.0.1
opencv-python==4.10.0.84 opencv-python==4.10.0.84
pillow==10.4.0
tqdm==4.66.4 tqdm==4.66.4
yt-dlp==2024.8.1 yt-dlp==2024.8.1
+165 -224
View File
@@ -1,301 +1,242 @@
import cv2 from utils import Download
import numpy as np from utils import Misc
import os from utils import log
import re from utils import src, dest, alt_temp, format, ignore_temp, verbose
import img2pdf
import tempfile
import yt_dlp
import argparse
from PIL import Image
from tqdm import tqdm from tqdm import tqdm
import tempfile
import os
import cv2
import re
import numpy as np
import img2pdf
import shutil
# test # temp
# https://www.youtube.com/watch?v=tyloC0e-Tqk # src: str = "../test/hello.webm"
# ./ # src: str = "https://www.youtube.com/watch?v=tyloC0e-Tqk"
# dest: str = "/home/sakamoto/Public/test"
# this is so overengineered that the bloat takes up 1/4 of the line
parser = argparse.ArgumentParser(
description="Converts Video static images based on significant frame changes to sheet music in a form of .pdf file.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("source", type=str, help="source of the file or a YouTube link")
parser.add_argument("destination", type=str, help="destination of the output file")
parser.add_argument("-v", "--verbose", action="store_true", help="enable debug mode")
parser.add_argument(
"-t",
"--change-threshold",
type=int,
default=12500000,
help="take a screenshot based on threshold",
)
args = parser.parse_args()
src = args.source
dest = args.destination
verbose = args.verbose
change_threshold = args.change_threshold
print(f"[INFO] The source file is: {src}")
print(f"[INFO] The destination file is: {dest}")
print("[INFO] Verbose enabled") if verbose is True else None
class Vid2Sheet: class Vid2Sheet:
def __init__(self, src, dest, frame_starts_at: int = 0): def __init__(self, src, dest, img_format: str, use_tempfile=True, ignore_temp=False):
self.src = src self.src = src
self.dest = dest self.dest = dest
self.img_format = img_format
self.temp_dir = tempfile.TemporaryDirectory() self.download = Download()
self.output_dir = self.temp_dir.name self.misc = Misc()
self.combined_img_dir = os.path.join(self.output_dir, "combined_img")
self.video_dir = os.path.join(self.output_dir, "video")
self.video_title = None
os.makedirs(self.combined_img_dir, exist_ok=True) self.dest_temp_dir = os.path.join(dest, "tmp")
self.define_temp = (
tempfile.TemporaryDirectory() if use_tempfile else self.dest_temp_dir
)
self.temp_dir = (
self.define_temp.name
if isinstance(self.define_temp, tempfile.TemporaryDirectory)
else self.define_temp
)
self.stitch_dir = os.path.join(self.temp_dir, "stitch")
self.video_dir = os.path.join(self.temp_dir, "video")
self.raw_dir = os.path.join(self.temp_dir, "raw")
os.makedirs(self.dest_temp_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
os.makedirs(self.stitch_dir, exist_ok=True)
os.makedirs(self.video_dir, exist_ok=True) os.makedirs(self.video_dir, exist_ok=True)
os.makedirs(self.dest, exist_ok=True) os.makedirs(self.raw_dir, exist_ok=True)
self.frame_count = frame_starts_at
self.extracted_count = 0
self.previous_frame = None self.previous_frame = None
# self.pbar = None log.info(f"Source: {self.src}")
self.total_frames = 0 log.info(f"Destination: {self.dest}")
log.debug(f"Use Tempfile? {use_tempfile}")
log.debug(f"Ignore temp delete? {ignore_temp}")
log.debug(f"Verbose? {verbose}")
log.debug(f"Temporary Directory: {self.temp_dir}")
log.debug(f"Stitch Directory: {self.stitch_dir}")
log.debug(f"Video Directory: {self.video_dir}")
if verbose: def __del__(self):
print(f"[DEBUG] temp_dir: {self.temp_dir}") if isinstance(self.define_temp, tempfile.TemporaryDirectory):
print(f"[DEBUG] output_dir: {self.output_dir}") log.debug(f"Deleting: {self.temp_dir}")
print(f"[DEBUG] combined_img_dir: {self.combined_img_dir}") self.define_temp.cleanup()
print(f"[DEBUG] video_dir: {self.video_dir}") else:
try:
if ignore_temp:
log.debug(f"Deleting: {self.dest_temp_dir}")
shutil.rmtree(self.dest_temp_dir)
except Exception as e:
log.error(e)
def run(self): def run(self):
self.check_video() self.check()
self.analyze_frame(change_threshold) self.capture()
self.combine_in_pairs() self.stitch()
self.pbar.close() self.convert()
self.convert_to_pdf()
def install_yt(self): def check(self):
youtube_pattern = re.compile( youtube_pattern = re.compile(
r"(https?://)?(www\.)?" r"(https?://)?(www\.)?"
r"(youtube\.com/watch\?v=|youtu\.be/)" r"(youtube\.com/watch\?v=|youtu\.be/)"
r"[a-zA-Z0-9_-]{11}", r"[a-zA-Z0-9_-]{11}",
re.IGNORECASE, re.IGNORECASE,
) )
if re.match(youtube_pattern, self.src):
print("[INFO] Detected YouTube link")
ydl_opts = {
"outtmpl": f"{self.video_dir}/%(title)s.%(ext)s",
"quiet": True,
"progress_hooks": [self._hook],
}
print("[INFO] Attempting to start download...")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([self.src])
return True
except Exception as e:
print(f"[ERR] An error occurred: {e}")
return False
def check_video(self): playlist_pattern = re.compile(
if self.install_yt(): r"(https?://)?(www\.)?(youtube\.com/playlist\?list=)[a-zA-Z0-9_-]+",
print("[INFO] Finished downloading") re.IGNORECASE,
)
if re.match(youtube_pattern, self.src):
log.info("Detected YouTube link")
self.download.video(self.src, self.video_dir)
all_entries = os.listdir(self.video_dir) all_entries = os.listdir(self.video_dir)
files = [ files = [
entry os.path.join(self.video_dir, entry)
for entry in all_entries for entry in all_entries
if os.path.isfile(os.path.join(self.video_dir, entry)) if os.path.isfile(os.path.join(self.video_dir, entry))
] ]
print(f"[INFO] Found {files}, in {self.video_dir}") self.src = os.path.join(self.video_dir, files[0])
if files: elif re.match(playlist_pattern, self.src):
self.cap = cv2.VideoCapture(os.path.join(self.video_dir, files[0])) log.info("Detected YouTube playlist")
else: self.download.playlist(self.src, self.video_dir)
print(f"[ERR] No files found in the directory {self.video_dir}.")
exit()
else:
self.cap = cv2.VideoCapture(self.src)
self.video_title = os.path.splitext(os.path.basename(self.src))[0]
if not self.cap.isOpened(): self.video = cv2.VideoCapture(self.src)
print("[ERR] Could not open video.") self.title = os.path.splitext(os.path.basename(self.src))[0]
exit()
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT))
self.pbar = tqdm(
if not self.video.isOpened():
log.error(f"Could not open video: {self.src}")
return
log.debug(f"Total frames: {self.total_frames}")
def capture(self, change_threshold=12500000):
count = 0
frame_count = 0
previous_frame = None
log.info("Analyzing and capturing caught frames; this may take a while...")
self.pbar_capture = tqdm(
total=self.total_frames, total=self.total_frames,
desc="Analyzing Frames", desc="Capturing significant changes",
bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} frames | {rate_fmt} | {elapsed} elapsed", bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} frames | {rate_fmt} | {elapsed} elapsed",
) )
(
print(f"[DEBUG] Total number of frames in the video: {self.total_frames}")
if verbose is True
else None
)
def _hook(self, d):
if d["status"] == "finished":
self.video_title = d.get("info_dict", {}).get("title", "unknown_title")
def analyze_frame(self, change_threshold=12500000):
(
print(f"[DEBUG] Change threshold is set to {change_threshold}")
if verbose is True
else None
)
while True: while True:
ret, current_frame = self.cap.read() ret, current_frame = self.video.read()
if not ret: if not ret:
break break
self.gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY) gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
img_output = os.path.join(self.raw_dir, f"img_{count:03}.{self.img_format}")
if self.previous_frame is None: if previous_frame is None:
image_path = os.path.join( log.debug(f"Starting at frame {frame_count}")
self.output_dir, f"image_{self.extracted_count:03}.jpg" cv2.imwrite(img_output, current_frame)
) count += 1
cv2.imwrite(image_path, current_frame)
self.extracted_count += 1
(
self.pbar.set_description(
f"[DEBUG] Start at Frame {self.frame_count}, saved as {image_path}"
)
if verbose is True
else self.pbar.set_description(
f"[INFO] Analyzing {self.video_title}"
)
)
else: else:
frame_diff = cv2.absdiff(self.previous_frame, self.gray_current) frame_diff = cv2.absdiff(previous_frame, gray_current)
diff_sum = np.sum(frame_diff) diff_sum = np.sum(frame_diff)
if diff_sum > change_threshold: if diff_sum > change_threshold:
image_path = os.path.join( log.debug(
self.output_dir, f"image_{self.extracted_count:03}.jpg" f"Significant change found at frame {frame_count}, saving to {os.path.join(self.raw_dir, f"{img_output}")}"
) )
cv2.imwrite(image_path, current_frame) cv2.imwrite(img_output, current_frame)
( count += 1
self.pbar.set_description(
f"[DEBUG] Frame {self.frame_count} changed significantly, saved as {image_path}"
)
if verbose is True
else None
)
self.extracted_count += 1
self.previous_frame = self.gray_current frame_count += 1
self.frame_count += 1 previous_frame = gray_current
self.pbar_capture.update(1)
self.pbar.update(1) # Update the progress bar by 1 for each frame self.pbar_capture.close()
self.video.release()
log.info("Analysis complete")
if self.pbar.n < self.total_frames: def stitch(self, dpi=300):
self.pbar.update(self.total_frames - self.pbar.n) log.info("Attempting to stitch by three for every group...")
letter_width = int(8.5 * dpi)
letter_height = int(11 * dpi)
self.cap.release() image_files = sorted(os.listdir(self.raw_dir))
def create_blank_image(self, width, height): if not image_files:
return Image.new("RGB", (width, height), "white") log.error("Found no images for stitching")
exit()
def combine_imgs(self, image_1, image_2, dest, mode="vertical"): for group_index in range(0, len(image_files), 3):
if isinstance(image_1, str): canvas = np.ones((letter_height, letter_width, 3), dtype=np.uint8) * 255
image_1 = Image.open(image_1)
if isinstance(image_2, str):
image_2 = Image.open(image_2)
width_1, height_1 = image_1.size group_files = image_files[group_index : group_index + 3]
width_2, height_2 = image_2.size
if mode == "horizontal": available_height = letter_height // 3
total_width = width_1 + width_2
max_height = max(height_1, height_2)
combined_image = Image.new("RGB", (total_width, max_height))
combined_image.paste(image_1, (0, 0))
combined_image.paste(image_2, (width_1, 0))
elif mode == "vertical": for i, img_file in enumerate(group_files):
max_width = max(width_1, width_2) img_path = os.path.join(self.raw_dir, img_file)
total_height = height_1 + height_2 img = cv2.imread(img_path)
combined_image = Image.new("RGB", (max_width, total_height))
combined_image.paste(image_1, (0, 0))
combined_image.paste(image_2, (0, height_1))
else: if img is None:
raise ValueError("[ERR] Mode must be either 'vertical' or 'horizontal'.") continue
combined_image.save(dest) img_height, img_width = img.shape[:2]
scale_factor = min(
letter_width / img_width, available_height / img_height
)
def combine_in_pairs(self): resized_img_width = int(img_width * scale_factor)
all_files = os.listdir(self.output_dir) resized_img_height = int(img_height * scale_factor)
non_hidden_files = [f for f in all_files if not f.startswith(".")] resized_img = cv2.resize(img, (resized_img_width, resized_img_height))
images = [
f
for f in non_hidden_files
if os.path.isfile(os.path.join(self.output_dir, f))
]
images.sort()
if len(images) % 2 != 0: x_offset = (letter_width - resized_img_width) // 2
last_image = images.pop() y_offset = (
else: available_height - resized_img_height
last_image = None ) // 2 + i * available_height
for img in range(0, len(images), 2): canvas[
image_1 = os.path.join(self.output_dir, images[img]) y_offset : y_offset + resized_img_height,
image_2 = os.path.join(self.output_dir, images[img + 1]) x_offset : x_offset + resized_img_width,
output_filename = f"combined_{img//2 + 1:03}.jpg" ] = resized_img
output_path = os.path.join(self.combined_img_dir, output_filename)
self.combine_imgs(image_1, image_2, output_path, mode="vertical")
if last_image: output_path = os.path.join(
last_image_path = os.path.join(self.output_dir, last_image) self.stitch_dir,
last_image_img = Image.open(last_image_path) f"stitched_page_{group_index // 3 + 1:03}.{self.img_format}",
width, height = last_image_img.size
blank_image = self.create_blank_image(width, height)
blank_image_path = os.path.join(self.output_dir, "blank_image.jpg")
blank_image.save(blank_image_path)
output_filename = f"combined_{len(images)//2 + 1:03}.jpg"
output_path = os.path.join(self.combined_img_dir, output_filename)
self.combine_imgs(
last_image_path, blank_image_path, output_path, mode="vertical"
) )
cv2.imwrite(output_path, canvas)
log.debug(
f"Saved stitched image for page {group_index // 3 + 1}: {output_path}"
)
log.info("Stitching done")
os.remove(blank_image_path) def convert(self):
log.info("Attempting to convert to pdf")
def convert_to_pdf(self): all_entries = os.listdir(self.stitch_dir)
all_entries = os.listdir(self.combined_img_dir)
imgs = [ imgs = [
os.path.join(self.combined_img_dir, entry) os.path.join(self.stitch_dir, entry)
for entry in all_entries for entry in all_entries
if os.path.isfile(os.path.join(self.combined_img_dir, entry)) if os.path.isfile(os.path.join(self.stitch_dir, entry))
and entry.endswith(".jpg")
] ]
imgs.sort()
log.debug(f"images in {self.stitch_dir} to convert: {imgs}")
print(f"[DEBUG] converting these imgs: {imgs} to .pdf") if verbose is True else print("[INFO] Converting to pdf...")
if not imgs: if not imgs:
print("[ERR] No images found for PDF conversion.") log.error("No images found for PDF conversion.")
return exit()
file_name = self.video_title if self.video_title else "output" file_name = self.title if self.title else "output"
pdf_path = os.path.join(self.dest, f"{file_name}.pdf") pdf_path = os.path.join(self.dest, f"{file_name}.pdf")
with open(pdf_path, "wb") as f: with open(pdf_path, "wb") as f:
f.write(img2pdf.convert(imgs)) f.write(img2pdf.convert(imgs)) # ignore the error
print(f"[INFO] Saved file to {pdf_path}") log.info(f"Saved file to {pdf_path}")
def __del__(self):
self.pbar.close() # Close the progress bar when done
self.temp_dir.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
program = Vid2Sheet(src, dest) vid2sheet = Vid2Sheet(src, dest, format, alt_temp, ignore_temp)
program.run() vid2sheet.run()