🎉 init
This commit is contained in:
+280
@@ -0,0 +1,280 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
import os
|
||||
import re
|
||||
import img2pdf
|
||||
import tempfile
|
||||
import yt_dlp
|
||||
import argparse
|
||||
|
||||
from PIL import Image
|
||||
from tqdm import tqdm
|
||||
|
||||
# test
|
||||
# https://www.youtube.com/watch?v=tyloC0e-Tqk
|
||||
# ./
|
||||
|
||||
# this is so overengineered that the bloat takes up 1/4 of the line
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Converts Video static images based on significant frame changes to sheet music in a form of .pdf file.",
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
|
||||
parser.add_argument("source", type=str, help="source of the file or a YouTube link")
|
||||
parser.add_argument("destination", type=str, help="destination of the output file")
|
||||
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="enable debug mode")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
src = args.source
|
||||
dest = args.destination
|
||||
verbose = args.verbose
|
||||
|
||||
print(f"[INFO] The source file is: {src}")
|
||||
print(f"[INFO] The destination file is: {dest}")
|
||||
print("[INFO] Verbose enabled") if verbose is True else None
|
||||
|
||||
|
||||
class Vid2Sheet:
|
||||
def __init__(self, src, dest, intro_start: int = 0):
|
||||
self.src = src
|
||||
self.dest = dest
|
||||
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.output_dir = self.temp_dir.name
|
||||
self.final = os.path.join(self.output_dir, "final")
|
||||
self.vid_folder = os.path.join(self.output_dir, "vid")
|
||||
self.video_title = None
|
||||
|
||||
os.makedirs(self.final, exist_ok=True)
|
||||
os.makedirs(self.vid_folder, exist_ok=True)
|
||||
os.makedirs(self.dest, exist_ok=True)
|
||||
|
||||
if self.installation():
|
||||
print("[INFO] Finished downloading")
|
||||
all_entries = os.listdir(self.vid_folder)
|
||||
files = [
|
||||
entry
|
||||
for entry in all_entries
|
||||
if os.path.isfile(os.path.join(self.vid_folder, entry))
|
||||
]
|
||||
print(f"[INFO] Found a video {files}, in {self.vid_folder}")
|
||||
|
||||
if files:
|
||||
self.cap = cv2.VideoCapture(os.path.join(self.vid_folder, files[0]))
|
||||
else:
|
||||
print("[ERR] No files found in the directory.")
|
||||
exit()
|
||||
else:
|
||||
self.cap = cv2.VideoCapture(self.src)
|
||||
# Extract the filename without extension as a default title
|
||||
self.video_title = os.path.splitext(os.path.basename(self.src))[0]
|
||||
|
||||
if not self.cap.isOpened():
|
||||
print("[ERR] Could not open video.")
|
||||
exit()
|
||||
|
||||
self.frame_count = intro_start
|
||||
self.extracted_count = 0
|
||||
self.previous_frame = None
|
||||
|
||||
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
(
|
||||
print(f"[DEBUG] Total number of frames in the video: {self.total_frames}")
|
||||
if verbose is True
|
||||
else None
|
||||
)
|
||||
self.pbar_count = 0
|
||||
self.pbar = tqdm(total=self.total_frames)
|
||||
|
||||
def installation(self):
|
||||
youtube_pattern = re.compile(
|
||||
r"(https?://)?(www\.)?"
|
||||
r"(youtube\.com/watch\?v=|youtu\.be/)"
|
||||
r"[a-zA-Z0-9_-]{11}",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if re.match(youtube_pattern, self.src):
|
||||
print("[INFO] Detected YouTube link")
|
||||
ydl_opts = {
|
||||
"outtmpl": f"{self.vid_folder}/%(title)s.%(ext)s",
|
||||
"quiet": True, # Suppress output
|
||||
"progress_hooks": [self._hook],
|
||||
}
|
||||
print("[INFO] Attempting to start download...")
|
||||
try:
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download([self.src])
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[ERR] An error occurred: {e}")
|
||||
return False
|
||||
|
||||
def _hook(self, d):
|
||||
if d["status"] == "finished":
|
||||
self.video_title = d.get("info_dict", {}).get("title", "unknown_title")
|
||||
|
||||
def analyze_frame(self, change_threshold=12500000):
|
||||
while True:
|
||||
ret, current_frame = self.cap.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
self.gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
if self.previous_frame is None:
|
||||
image_path = os.path.join(
|
||||
self.output_dir, f"image_{self.extracted_count:03}.jpg"
|
||||
)
|
||||
cv2.imwrite(image_path, current_frame)
|
||||
self.extracted_count += 1
|
||||
(
|
||||
self.pbar.set_description(
|
||||
f"[DEBUG] Start at Frame {self.frame_count}, saved as {image_path}"
|
||||
) if verbose is True else self.pbar.set_description("[INFO] Analyzing video...")
|
||||
)
|
||||
|
||||
else:
|
||||
frame_diff = cv2.absdiff(self.previous_frame, self.gray_current)
|
||||
diff_sum = np.sum(frame_diff)
|
||||
|
||||
if diff_sum > change_threshold:
|
||||
image_path = os.path.join(
|
||||
self.output_dir, f"image_{self.extracted_count:03}.jpg"
|
||||
)
|
||||
cv2.imwrite(image_path, current_frame)
|
||||
self.pbar.set_description(
|
||||
f"[DEBUG] Frame {self.frame_count} changed significantly, saved as {image_path}"
|
||||
) if verbose is True else None
|
||||
self.extracted_count += 1
|
||||
|
||||
self.previous_frame = self.gray_current
|
||||
self.frame_count += 1
|
||||
|
||||
self.pbar_count += 1
|
||||
self.pbar.update()
|
||||
|
||||
self.cap.release()
|
||||
|
||||
def create_blank_image(self, width, height):
|
||||
return Image.new("RGB", (width, height), "white")
|
||||
|
||||
def combine_imgs(self, image_1, image_2, dest, mode="vertical"):
|
||||
if isinstance(image_1, str):
|
||||
image_1 = Image.open(image_1)
|
||||
if isinstance(image_2, str):
|
||||
image_2 = Image.open(image_2)
|
||||
|
||||
width_1, height_1 = image_1.size
|
||||
width_2, height_2 = image_2.size
|
||||
|
||||
if mode == "horizontal":
|
||||
total_width = width_1 + width_2
|
||||
max_height = max(height_1, height_2)
|
||||
combined_image = Image.new("RGB", (total_width, max_height))
|
||||
combined_image.paste(image_1, (0, 0))
|
||||
combined_image.paste(image_2, (width_1, 0))
|
||||
|
||||
elif mode == "vertical":
|
||||
max_width = max(width_1, width_2)
|
||||
total_height = height_1 + height_2
|
||||
combined_image = Image.new("RGB", (max_width, total_height))
|
||||
combined_image.paste(image_1, (0, 0))
|
||||
combined_image.paste(image_2, (0, height_1))
|
||||
|
||||
else:
|
||||
raise ValueError("[ERR] Mode must be either 'vertical' or 'horizontal'.")
|
||||
|
||||
combined_image.save(dest)
|
||||
|
||||
def combine_in_pairs(self):
|
||||
all_files = os.listdir(self.output_dir)
|
||||
non_hidden_files = [f for f in all_files if not f.startswith(".")]
|
||||
images = [
|
||||
f
|
||||
for f in non_hidden_files
|
||||
if os.path.isfile(os.path.join(self.output_dir, f))
|
||||
]
|
||||
images.sort(key=natural_keys)
|
||||
|
||||
if len(images) % 2 != 0:
|
||||
last_image = images.pop()
|
||||
else:
|
||||
last_image = None
|
||||
|
||||
for img in range(0, len(images), 2):
|
||||
image_1 = os.path.join(self.output_dir, images[img])
|
||||
image_2 = os.path.join(self.output_dir, images[img + 1])
|
||||
output_filename = f"combined_{img//2 + 1:03}.jpg"
|
||||
output_path = os.path.join(self.final, output_filename)
|
||||
self.combine_imgs(image_1, image_2, output_path, mode="vertical")
|
||||
|
||||
if last_image:
|
||||
last_image_path = os.path.join(self.output_dir, last_image)
|
||||
last_image_img = Image.open(last_image_path)
|
||||
width, height = last_image_img.size
|
||||
blank_image = self.create_blank_image(width, height)
|
||||
|
||||
blank_image_path = os.path.join(self.output_dir, "blank_image.jpg")
|
||||
blank_image.save(blank_image_path)
|
||||
|
||||
output_filename = f"combined_{len(images)//2 + 1:03}.jpg"
|
||||
output_path = os.path.join(self.final, output_filename)
|
||||
self.combine_imgs(
|
||||
last_image_path, blank_image_path, output_path, mode="vertical"
|
||||
)
|
||||
|
||||
os.remove(blank_image_path)
|
||||
|
||||
self.pbar_count += 1
|
||||
self.pbar.update()
|
||||
|
||||
def run(self):
|
||||
self.analyze_frame()
|
||||
self.combine_in_pairs()
|
||||
self.convert_to_pdf()
|
||||
|
||||
def scan_folder(self, src):
|
||||
files_in_folder = []
|
||||
|
||||
for root, dirs, files in os.walk(src):
|
||||
for name in files:
|
||||
if name.endswith((".png", ".jpg", ".jpeg", ".mp4", ".webm")):
|
||||
files_in_folder.append(os.path.join(root, name))
|
||||
|
||||
files_in_folder.sort(key=natural_keys)
|
||||
return files_in_folder
|
||||
|
||||
def convert_to_pdf(self):
|
||||
imgs = self.scan_folder(self.final)
|
||||
if not imgs:
|
||||
print("[ERR] No images found for PDF conversion.")
|
||||
return
|
||||
|
||||
file_name = self.video_title if self.video_title else "output"
|
||||
pdf_path = os.path.join(self.dest, f"{file_name}.pdf")
|
||||
|
||||
with open(pdf_path, "wb") as f:
|
||||
f.write(img2pdf.convert(imgs))
|
||||
|
||||
self.pbar.set_description(f"[INFO] Saved file to {pdf_path}")
|
||||
self.pbar_count += 1
|
||||
self.pbar.update()
|
||||
|
||||
def __del__(self):
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
|
||||
def atoi(text):
|
||||
return int(text) if text.isdigit() else text
|
||||
|
||||
|
||||
def natural_keys(text):
|
||||
return [atoi(c) for c in re.split(r"(\d+)", text)]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
program = Vid2Sheet(src, dest)
|
||||
program.run()
|
||||
Reference in New Issue
Block a user