diff --git a/requirements.txt b/requirements.txt index 6a410f7..2956f7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,5 @@ img2pdf==0.4.4 numpy==2.0.1 opencv-python==4.10.0.84 -pillow==10.4.0 tqdm==4.66.4 yt-dlp==2024.8.1 diff --git a/vid2sheet.py b/vid2sheet.py index 74c22f1..d0799f9 100644 --- a/vid2sheet.py +++ b/vid2sheet.py @@ -1,301 +1,242 @@ -import cv2 -import numpy as np -import os -import re -import img2pdf -import tempfile -import yt_dlp -import argparse +from utils import Download +from utils import Misc +from utils import log +from utils import src, dest, alt_temp, format, ignore_temp, verbose -from PIL import Image from tqdm import tqdm +import tempfile +import os +import cv2 +import re +import numpy as np +import img2pdf +import shutil -# test -# https://www.youtube.com/watch?v=tyloC0e-Tqk -# ./ - -# this is so overengineered that the bloat takes up 1/4 of the line - -parser = argparse.ArgumentParser( - description="Converts Video static images based on significant frame changes to sheet music in a form of .pdf file.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, -) - -parser.add_argument("source", type=str, help="source of the file or a YouTube link") -parser.add_argument("destination", type=str, help="destination of the output file") - -parser.add_argument("-v", "--verbose", action="store_true", help="enable debug mode") -parser.add_argument( - "-t", - "--change-threshold", - type=int, - default=12500000, - help="take a screenshot based on threshold", -) - -args = parser.parse_args() - -src = args.source -dest = args.destination -verbose = args.verbose -change_threshold = args.change_threshold - -print(f"[INFO] The source file is: {src}") -print(f"[INFO] The destination file is: {dest}") -print("[INFO] Verbose enabled") if verbose is True else None - +# temp +# src: str = "../test/hello.webm" +# src: str = "https://www.youtube.com/watch?v=tyloC0e-Tqk" +# dest: str = "/home/sakamoto/Public/test" class Vid2Sheet: - def __init__(self, src, dest, frame_starts_at: int = 0): + def __init__(self, src, dest, img_format: str, use_tempfile=True, ignore_temp=False): self.src = src self.dest = dest + self.img_format = img_format - self.temp_dir = tempfile.TemporaryDirectory() - self.output_dir = self.temp_dir.name - self.combined_img_dir = os.path.join(self.output_dir, "combined_img") - self.video_dir = os.path.join(self.output_dir, "video") - self.video_title = None + self.download = Download() + self.misc = Misc() - os.makedirs(self.combined_img_dir, exist_ok=True) + self.dest_temp_dir = os.path.join(dest, "tmp") + self.define_temp = ( + tempfile.TemporaryDirectory() if use_tempfile else self.dest_temp_dir + ) + self.temp_dir = ( + self.define_temp.name + if isinstance(self.define_temp, tempfile.TemporaryDirectory) + else self.define_temp + ) + + self.stitch_dir = os.path.join(self.temp_dir, "stitch") + self.video_dir = os.path.join(self.temp_dir, "video") + self.raw_dir = os.path.join(self.temp_dir, "raw") + + os.makedirs(self.dest_temp_dir, exist_ok=True) + os.makedirs(self.temp_dir, exist_ok=True) + os.makedirs(self.stitch_dir, exist_ok=True) os.makedirs(self.video_dir, exist_ok=True) - os.makedirs(self.dest, exist_ok=True) + os.makedirs(self.raw_dir, exist_ok=True) - self.frame_count = frame_starts_at - self.extracted_count = 0 self.previous_frame = None - # self.pbar = None - self.total_frames = 0 + log.info(f"Source: {self.src}") + log.info(f"Destination: {self.dest}") + log.debug(f"Use Tempfile? {use_tempfile}") + log.debug(f"Ignore temp delete? {ignore_temp}") + log.debug(f"Verbose? {verbose}") + log.debug(f"Temporary Directory: {self.temp_dir}") + log.debug(f"Stitch Directory: {self.stitch_dir}") + log.debug(f"Video Directory: {self.video_dir}") - if verbose: - print(f"[DEBUG] temp_dir: {self.temp_dir}") - print(f"[DEBUG] output_dir: {self.output_dir}") - print(f"[DEBUG] combined_img_dir: {self.combined_img_dir}") - print(f"[DEBUG] video_dir: {self.video_dir}") + def __del__(self): + if isinstance(self.define_temp, tempfile.TemporaryDirectory): + log.debug(f"Deleting: {self.temp_dir}") + self.define_temp.cleanup() + else: + try: + if ignore_temp: + log.debug(f"Deleting: {self.dest_temp_dir}") + shutil.rmtree(self.dest_temp_dir) + except Exception as e: + log.error(e) def run(self): - self.check_video() - self.analyze_frame(change_threshold) - self.combine_in_pairs() - self.pbar.close() - self.convert_to_pdf() + self.check() + self.capture() + self.stitch() + self.convert() - def install_yt(self): + def check(self): youtube_pattern = re.compile( r"(https?://)?(www\.)?" r"(youtube\.com/watch\?v=|youtu\.be/)" r"[a-zA-Z0-9_-]{11}", re.IGNORECASE, ) - if re.match(youtube_pattern, self.src): - print("[INFO] Detected YouTube link") - ydl_opts = { - "outtmpl": f"{self.video_dir}/%(title)s.%(ext)s", - "quiet": True, - "progress_hooks": [self._hook], - } - print("[INFO] Attempting to start download...") - try: - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - ydl.download([self.src]) - return True - except Exception as e: - print(f"[ERR] An error occurred: {e}") - return False - def check_video(self): - if self.install_yt(): - print("[INFO] Finished downloading") + playlist_pattern = re.compile( + r"(https?://)?(www\.)?(youtube\.com/playlist\?list=)[a-zA-Z0-9_-]+", + re.IGNORECASE, + ) + + if re.match(youtube_pattern, self.src): + log.info("Detected YouTube link") + self.download.video(self.src, self.video_dir) all_entries = os.listdir(self.video_dir) files = [ - entry + os.path.join(self.video_dir, entry) for entry in all_entries if os.path.isfile(os.path.join(self.video_dir, entry)) ] - print(f"[INFO] Found {files}, in {self.video_dir}") + self.src = os.path.join(self.video_dir, files[0]) - if files: - self.cap = cv2.VideoCapture(os.path.join(self.video_dir, files[0])) - else: - print(f"[ERR] No files found in the directory {self.video_dir}.") - exit() - else: - self.cap = cv2.VideoCapture(self.src) - self.video_title = os.path.splitext(os.path.basename(self.src))[0] + elif re.match(playlist_pattern, self.src): + log.info("Detected YouTube playlist") + self.download.playlist(self.src, self.video_dir) - if not self.cap.isOpened(): - print("[ERR] Could not open video.") - exit() + self.video = cv2.VideoCapture(self.src) + self.title = os.path.splitext(os.path.basename(self.src))[0] - self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) - self.pbar = tqdm( + self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT)) + + if not self.video.isOpened(): + log.error(f"Could not open video: {self.src}") + return + + log.debug(f"Total frames: {self.total_frames}") + + def capture(self, change_threshold=12500000): + count = 0 + frame_count = 0 + previous_frame = None + log.info("Analyzing and capturing caught frames; this may take a while...") + + self.pbar_capture = tqdm( total=self.total_frames, - desc="Analyzing Frames", + desc="Capturing significant changes", bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} frames | {rate_fmt} | {elapsed} elapsed", ) - ( - print(f"[DEBUG] Total number of frames in the video: {self.total_frames}") - if verbose is True - else None - ) - def _hook(self, d): - if d["status"] == "finished": - self.video_title = d.get("info_dict", {}).get("title", "unknown_title") - - def analyze_frame(self, change_threshold=12500000): - ( - print(f"[DEBUG] Change threshold is set to {change_threshold}") - if verbose is True - else None - ) while True: - ret, current_frame = self.cap.read() + ret, current_frame = self.video.read() + if not ret: break - self.gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY) + gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY) + img_output = os.path.join(self.raw_dir, f"img_{count:03}.{self.img_format}") - if self.previous_frame is None: - image_path = os.path.join( - self.output_dir, f"image_{self.extracted_count:03}.jpg" - ) - cv2.imwrite(image_path, current_frame) - self.extracted_count += 1 - ( - self.pbar.set_description( - f"[DEBUG] Start at Frame {self.frame_count}, saved as {image_path}" - ) - if verbose is True - else self.pbar.set_description( - f"[INFO] Analyzing {self.video_title}" - ) - ) + if previous_frame is None: + log.debug(f"Starting at frame {frame_count}") + cv2.imwrite(img_output, current_frame) + count += 1 else: - frame_diff = cv2.absdiff(self.previous_frame, self.gray_current) + frame_diff = cv2.absdiff(previous_frame, gray_current) diff_sum = np.sum(frame_diff) if diff_sum > change_threshold: - image_path = os.path.join( - self.output_dir, f"image_{self.extracted_count:03}.jpg" + log.debug( + f"Significant change found at frame {frame_count}, saving to {os.path.join(self.raw_dir, f"{img_output}")}" ) - cv2.imwrite(image_path, current_frame) - ( - self.pbar.set_description( - f"[DEBUG] Frame {self.frame_count} changed significantly, saved as {image_path}" - ) - if verbose is True - else None - ) - self.extracted_count += 1 + cv2.imwrite(img_output, current_frame) + count += 1 - self.previous_frame = self.gray_current - self.frame_count += 1 + frame_count += 1 + previous_frame = gray_current + self.pbar_capture.update(1) - self.pbar.update(1) # Update the progress bar by 1 for each frame + self.pbar_capture.close() + self.video.release() + log.info("Analysis complete") - if self.pbar.n < self.total_frames: - self.pbar.update(self.total_frames - self.pbar.n) + def stitch(self, dpi=300): + log.info("Attempting to stitch by three for every group...") + letter_width = int(8.5 * dpi) + letter_height = int(11 * dpi) - self.cap.release() + image_files = sorted(os.listdir(self.raw_dir)) - def create_blank_image(self, width, height): - return Image.new("RGB", (width, height), "white") + if not image_files: + log.error("Found no images for stitching") + exit() - def combine_imgs(self, image_1, image_2, dest, mode="vertical"): - if isinstance(image_1, str): - image_1 = Image.open(image_1) - if isinstance(image_2, str): - image_2 = Image.open(image_2) + for group_index in range(0, len(image_files), 3): + canvas = np.ones((letter_height, letter_width, 3), dtype=np.uint8) * 255 - width_1, height_1 = image_1.size - width_2, height_2 = image_2.size + group_files = image_files[group_index : group_index + 3] - if mode == "horizontal": - total_width = width_1 + width_2 - max_height = max(height_1, height_2) - combined_image = Image.new("RGB", (total_width, max_height)) - combined_image.paste(image_1, (0, 0)) - combined_image.paste(image_2, (width_1, 0)) + available_height = letter_height // 3 - elif mode == "vertical": - max_width = max(width_1, width_2) - total_height = height_1 + height_2 - combined_image = Image.new("RGB", (max_width, total_height)) - combined_image.paste(image_1, (0, 0)) - combined_image.paste(image_2, (0, height_1)) + for i, img_file in enumerate(group_files): + img_path = os.path.join(self.raw_dir, img_file) + img = cv2.imread(img_path) - else: - raise ValueError("[ERR] Mode must be either 'vertical' or 'horizontal'.") + if img is None: + continue - combined_image.save(dest) + img_height, img_width = img.shape[:2] + scale_factor = min( + letter_width / img_width, available_height / img_height + ) - def combine_in_pairs(self): - all_files = os.listdir(self.output_dir) - non_hidden_files = [f for f in all_files if not f.startswith(".")] - images = [ - f - for f in non_hidden_files - if os.path.isfile(os.path.join(self.output_dir, f)) - ] - images.sort() + resized_img_width = int(img_width * scale_factor) + resized_img_height = int(img_height * scale_factor) + resized_img = cv2.resize(img, (resized_img_width, resized_img_height)) - if len(images) % 2 != 0: - last_image = images.pop() - else: - last_image = None + x_offset = (letter_width - resized_img_width) // 2 + y_offset = ( + available_height - resized_img_height + ) // 2 + i * available_height - for img in range(0, len(images), 2): - image_1 = os.path.join(self.output_dir, images[img]) - image_2 = os.path.join(self.output_dir, images[img + 1]) - output_filename = f"combined_{img//2 + 1:03}.jpg" - output_path = os.path.join(self.combined_img_dir, output_filename) - self.combine_imgs(image_1, image_2, output_path, mode="vertical") + canvas[ + y_offset : y_offset + resized_img_height, + x_offset : x_offset + resized_img_width, + ] = resized_img - if last_image: - last_image_path = os.path.join(self.output_dir, last_image) - last_image_img = Image.open(last_image_path) - width, height = last_image_img.size - blank_image = self.create_blank_image(width, height) - - blank_image_path = os.path.join(self.output_dir, "blank_image.jpg") - blank_image.save(blank_image_path) - - output_filename = f"combined_{len(images)//2 + 1:03}.jpg" - output_path = os.path.join(self.combined_img_dir, output_filename) - self.combine_imgs( - last_image_path, blank_image_path, output_path, mode="vertical" + output_path = os.path.join( + self.stitch_dir, + f"stitched_page_{group_index // 3 + 1:03}.{self.img_format}", ) + cv2.imwrite(output_path, canvas) + log.debug( + f"Saved stitched image for page {group_index // 3 + 1}: {output_path}" + ) + log.info("Stitching done") - os.remove(blank_image_path) - - def convert_to_pdf(self): - all_entries = os.listdir(self.combined_img_dir) + def convert(self): + log.info("Attempting to convert to pdf") + all_entries = os.listdir(self.stitch_dir) imgs = [ - os.path.join(self.combined_img_dir, entry) + os.path.join(self.stitch_dir, entry) for entry in all_entries - if os.path.isfile(os.path.join(self.combined_img_dir, entry)) - and entry.endswith(".jpg") + if os.path.isfile(os.path.join(self.stitch_dir, entry)) ] + imgs.sort() + log.debug(f"images in {self.stitch_dir} to convert: {imgs}") - print(f"[DEBUG] converting these imgs: {imgs} to .pdf") if verbose is True else print("[INFO] Converting to pdf...") if not imgs: - print("[ERR] No images found for PDF conversion.") - return + log.error("No images found for PDF conversion.") + exit() - file_name = self.video_title if self.video_title else "output" + file_name = self.title if self.title else "output" pdf_path = os.path.join(self.dest, f"{file_name}.pdf") with open(pdf_path, "wb") as f: - f.write(img2pdf.convert(imgs)) + f.write(img2pdf.convert(imgs)) # ignore the error - print(f"[INFO] Saved file to {pdf_path}") + log.info(f"Saved file to {pdf_path}") - def __del__(self): - self.pbar.close() # Close the progress bar when done - self.temp_dir.cleanup() if __name__ == "__main__": - program = Vid2Sheet(src, dest) - program.run() + vid2sheet = Vid2Sheet(src, dest, format, alt_temp, ignore_temp) + vid2sheet.run()